Update Security Groups Automatically Using AWS Lambda
1. In the AWS Management Console, use the AWS search bar to search for
EC2
and then choose the service from the list.
2. In the left navigation pane, click Security Groups.
3. Click Create security group then configure:
Security group name: AutoUpdateSecurityGroup-Global-A
Description: AutoUpdateSecurityGroup-Global
VPC: Lab VPC
4. Click Add new tag and configure:
Key: Name
Value: cloudfront_g
(for global rules)
Click Add new tag again and enter:
Key: AutoUpdate
Value: true
Click Add new tag again and enter:
Key: Protocol
Value: http
5. Click Create security group .
6. Repeat the preceding steps to create a security group named
AutoUpdateSecurityGroup-Global-B. All of the other entries remain the same.
7. Click Create security group then configure:
Security group name: AutoUpdateSecurityGroup-Region
Description: AutoUpdateSecurityGroup-Region
VPC: Lab VPC
Scroll down to the Tags - optional section.
Note: Tags are case sensitive, so use copy and paste to enter the actual values from the following instructions.
8. Click Add new tag and configure:
Key: Name
Value: cloudfront_r
(for regional rules)
Click Add new tag again and enter:
Key: AutoUpdate
Value: true
Click Add new tag again and enter:
Key: Protocol
Value: http
9. Click Create security group .
1. In the AWS Management Console, use the AWS search bar to search for
Lambda
and then choose the service from the list.
2. Click Create function
3. Choose Author from scratch
Configure the following:
Function name: SecurityGroupAutoUpdate
Runtime: Python 3.11
Recommended by LinkedIn
4. Expand Change default execution role
Execution role: Select Use an existing role
Existing role: Choose lambda-role
5. Click Create function .
6. In the Code section, delete all of the code that appears in the Code source lambda_function.py file.
7. Copy and paste the following code into the code editor:
import boto3
import hashlib
import json
import urllib3
http = urllib3.PoolManager()
# Name of the service, as seen in the ip-groups.json file, to extract information for
SERVICE = "CLOUDFRONT"
# Ports your application uses that need inbound permissions from the service for
INGRESS_PORTS = {'Http': 80}
# Tags which identify the security groups you want to update
SECURITY_GROUP_TAG_FOR_GLOBAL_HTTP = {'Name': 'cloudfront_g', 'AutoUpdate': 'true', 'Protocol': 'http'}
SECURITY_GROUP_TAG_FOR_REGION_HTTP = {'Name': 'cloudfront_r', 'AutoUpdate': 'true', 'Protocol': 'http'}
SG_RULES_LIMIT = 60
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
message = json.loads(event['Records'][0]['Sns']['Message'])
# Load the ip ranges from the url
ip_ranges = json.loads(get_ip_groups_json(message['url'], message['md5']))
# extract the service ranges
global_cf_ranges = get_ranges_for_service(ip_ranges, SERVICE, "GLOBAL")
region_cf_ranges = get_ranges_for_service(ip_ranges, SERVICE, "REGION")
# split global ip list if more than soft limit.
if (len(global_cf_ranges)) > SG_RULES_LIMIT:
global_cf_ranges_1 = (global_cf_ranges[:SG_RULES_LIMIT])
global_cf_ranges_2 = (global_cf_ranges[SG_RULES_LIMIT:])
global_cf_ranges = [global_cf_ranges_1, global_cf_ranges_2]
#limit regional ip list if more than soft limit.
if (len(region_cf_ranges)) > SG_RULES_LIMIT:
region_cf_ranges = (region_cf_ranges[:SG_RULES_LIMIT])
ip_ranges = {"GLOBAL": global_cf_ranges, "REGION": region_cf_ranges}
result = update_security_groups(ip_ranges)
return result
def get_ip_groups_json(url, expected_hash):
print("Updating from " + url)
response = http.request('GET', url)
ip_json = response.data
return ip_json
def get_ranges_for_service(ranges, service, subset):
service_ranges = list()
for prefix in ranges['prefixes']:
if prefix['service'] == service and ((subset == prefix['region'] and subset == "GLOBAL") or (
subset != 'GLOBAL' and prefix['region'] != 'GLOBAL')):
print('Found ' + service + ' region: ' + prefix['region'] + ' range: ' + prefix['ip_prefix'])
service_ranges.append(prefix['ip_prefix'])
return service_ranges
def update_security_groups(new_ranges):
client = boto3.client('ec2')
global_http_group = get_security_groups_for_update(client, SECURITY_GROUP_TAG_FOR_GLOBAL_HTTP)
region_http_group = get_security_groups_for_update(client, SECURITY_GROUP_TAG_FOR_REGION_HTTP)
print('Found ' + str(len(global_http_group)) + ' CloudFront_g HttpSecurityGroups to update')
print('Found ' + str(len(region_http_group)) + ' CloudFront_r HttpSecurityGroups to update')
result = list()
global_http_updated = 0
region_http_updated = 0
for i in range(len(global_http_group)):
if update_security_group(client, global_http_group[i], new_ranges["GLOBAL"][i], INGRESS_PORTS['Http']):
global_http_updated += 1
result.append('Updated ' + global_http_group[i]['GroupId'])
for group in region_http_group:
if update_security_group(client, group, new_ranges["REGION"], INGRESS_PORTS['Http']):
region_http_updated += 1
result.append('Updated ' + group['GroupId'])
result.append('Updated ' + str(global_http_updated) + ' of ' + str(
len(global_http_group)) + ' CloudFront_g HttpSecurityGroups')
result.append('Updated ' + str(region_http_updated) + ' of ' + str(
len(region_http_group)) + ' CloudFront_r HttpSecurityGroups')
return result
def update_security_group(client, group, new_ranges, port):
added = 0
removed = 0
if len(group['IpPermissions']) > 0:
for permission in group['IpPermissions']:
if permission['FromPort'] <= port and permission['ToPort'] >= port:
old_prefixes = list()
to_revoke = list()
to_add = list()
for range in permission['IpRanges']:
cidr = range['CidrIp']
old_prefixes.append(cidr)
if new_ranges.count(cidr) == 0:
to_revoke.append(range)
print(group['GroupId'] + ": Revoking " + cidr + ":" + str(permission['ToPort']))
for range in new_ranges:
if old_prefixes.count(range) == 0:
to_add.append({'CidrIp': range})
print(group['GroupId'] + ": Adding " + range + ":" + str(permission['ToPort']))
removed += revoke_permissions(client, group, permission, to_revoke)
added += add_permissions(client, group, permission, to_add)
else:
to_add = list()
for range in new_ranges:
to_add.append({'CidrIp': range})
print(group['GroupId'] + ": Adding " + range + ":" + str(port))
permission = {'ToPort': port, 'FromPort': port, 'IpProtocol': 'tcp'}
added += add_permissions(client, group, permission, to_add)
print(group['GroupId'] + ": Added " + str(added) + ", Revoked " + str(removed))
return added > 0 or removed > 0
def revoke_permissions(client, group, permission, to_revoke):
if len(to_revoke) > 0:
revoke_params = {
'ToPort': permission['ToPort'],
'FromPort': permission['FromPort'],
'IpRanges': to_revoke,
'IpProtocol': permission['IpProtocol']
}
client.revoke_security_group_ingress(GroupId=group['GroupId'], IpPermissions=[revoke_params])
return len(to_revoke)
def add_permissions(client, group, permission, to_add):
if len(to_add) > 0:
add_params = {
'ToPort': permission['ToPort'],
'FromPort': permission['FromPort'],
'IpRanges': to_add,
'IpProtocol': permission['IpProtocol']
}
client.authorize_security_group_ingress(GroupId=group['GroupId'], IpPermissions=[add_params])
return len(to_add)
def get_security_groups_for_update(client, security_group_tag):
filters = list()
for key, value in security_group_tag.items():
filters.extend(
[
{'Name': "tag:" + key, 'Values': [value]}
]
)
response = client.describe_security_groups(Filters=filters)
return response['SecurityGroups']
Examine the code. It performs the following steps: Get the latest, updated IP ranges from the URL given in the SNS notification Filter the IP ranges for CloudFront and AWS WAF IP ranges Update any VPC security groups that are tagged with “Name”: “cloudfront_g/r” and “AutoUpdate”: “true”
8. Click Deploy
9. Click the Configuration tab, in the General configuration section, click Edit and configure:
Description: Automatically update Security Group
Timeout: 1 min 0 sec
10. Click Save
11. Scroll up to the Function overview section.
12. Click Add trigger then configure:
Select a trigger: SNS
SNS topic:
arn:aws:sns:us-east-1:806199016981:AmazonIpSpaceChanged
13. Click Add .
Test a Lambda function with sample events
1. Click Test tab, and configure Test event, which you can use to simulate a Lambda trigger. In this case, you simulate an update of the CloudFront IP address ranges.
2. Select Create new event.
3. For Event name, enter
Notification
.
4. Delete the existing test code.
5. Copy and paste the following test code into the test event window:
{
"Records": [
{
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:EXAMPLE",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
"Timestamp": "1970-01-01T00:00:00.000Z",
"Signature": "EXAMPLE",
"SigningCertUrl": "EXAMPLE",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"Message": "{\"create-time\": \"yyyy-mm-ddThh:mm:ss+00:00\", \"synctoken\": \"0123456789\", \"md5\": \"7fd59f5c7f5cf643036cbd4443ad3e4b\", \"url\": \"https://ip-ranges.amazonaws.com/ip-ranges.json\"}",
"Type": "Notification",
"UnsubscribeUrl": "EXAMPLE",
"TopicArn": "arn:aws:sns:EXAMPLE",
"Subject": "TestInvoke"
}
}
]
}
6. Click Save .
7. Click Test
8. Expand Details to view the output of the Lambda function.
Verify Security Group Update
1. In the AWS Management Console, use the AWS search bar to search for
EC2
and then choose the service from the list.
2. In the left navigation pane, click Security Groups.
3. Select one of the cloudfront_g security groups.
4. In the bottom pane, click the Inbound rules tab.
Note: To see more content in this pane, drag the pane upward.
When you scroll through the list of rules, you now see the CloudFront IP ranges added as allowed inbound traffic.
The AutoUpdateSecurityGroup-Region security group also contains a set of rules.