Codementor Events

Apache Airflow migraiton journey from self-hosted to AWS Managed Airflow

Published Mar 30, 2022Last updated Sep 25, 2022
Apache Airflow migraiton journey from self-hosted to AWS Managed Airflow

Introduction

Due to security, and compatibility issues with migrating our self-hosted Airflow envirinment, we decided to migrate to AWS Managed Workflows for Apache Airflow (mwaa). The old EKS cluster was using istio as an ingress gateway controller, however we dropped this on the new cluster and opted for a more managed approach of using the AWS Loadbalancer Controller for the majority of ingress, and the nginx ingress controller for any services which required more complex ingress rules. During the course of the migration we encountered some issues with Apache Airflow around correct traffic routing/authentication for the web gui. We spent several days attempting to resolve these problems before deciding that it was better to simply switch to using AWS Managed Workflows for Apache Airflow (mwaa). This article will go through the specific issues we faced, and the journey to migrating to mwaa.

Self-hosted Implementation

The original configuration

Our original implementation of Apache Airflow was deployed onto EKS using the community maintained helm chart. Authentication for the web gui washandled via Google oAuth, over HTTPS — a fairly standard setup. oAuth values were set in the helm chart and the callback URLs were configured on the Google GCP side for the application. You can see we are retrieving Google credentials from a kubernetes secret which is configured via ExternalSecrets.

Note: The AIRFLOW__GOOGLE__CLIENT_ID, and AIRFLOW__GOOGLE__DOMAINvalues below have been replaced with dummy data.

extraEnv:
- name: AIRFLOW__WEBSERVER__AUTHENTICATE
  value: "True"
- name: AIRFLOW__WEBSERVER__AUTH_BACKEND
  value: airflow.contrib.auth.backends.google_auth
- name: AIRFLOW__GOOGLE__CLIENT_ID
  value: 123456789-1245tghu87654esdxcvbhu8765rdcvg.apps.googleusercontent.com
- name: AIRFLOW__GOOGLE__CLIENT_SECRET
  valueFrom:
    secretKeyRef:
      name: airflow-google-client-secret
      key: client_secret
      optional: false
- name: AIRFLOW__GOOGLE__OAUTH_CALLBACK_ROUTE
  value: "/oauth2callback"
- name: AIRFLOW__GOOGLE__DOMAIN
  value: company.com
- name: AIRFLOW__GOOGLE__PROMPT
  value: "select_account consent"

Our web base URL is configured with the HTTPS protocol:

web:
  baseUrl: https://airflow.prod.company.io

And we have an Istio virtual service for ingress using our configured Istio exgternal ingress gateway.

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: airflow
  namespace: airflow
spec:
  hosts:
  - airflow.prod.company.io
  gateways:
  - istio-system/company-gateway-external
  http:
  - route:
    - destination:
        host: airflow-web

All is working as expected, and when navigating in a web browser to https://airflow.prod.company.io you are routed to the Airflow web gui and are prompted to login using Google SSO identity provider.

The Attempted Migration Configuration

The configuration for the migration was very similar however we opted to use the official helm chart for the deployment rather than the community chart, which meant re-factoring many of the parameters and values to conform to the official helm chart specs. In my opinion this is one of the down-sides of using helm to deploy applications in a smaller environment, however discussing the pros and cons of deploying to kubernetes uisng helm is not within the scope of this post.

The majority of the helm release config is identical, although re-factored slightly to conform to the chart specific templates. However, as I mentioned previously we are no longer using istio in the new EKS cluster so instead we configure service and ingress resources, and use the AWS Loadbalancer Controller annotations on the ingress resource to provision an internal loadbalancer (because this service should not be acessible outside of our organisation) within AWS for our application.

apiVersion: v1
kind: Service
metadata:
  name: airflow
  namespace: airflow
spec:
  ports:
    - port: 80
      targetPort: 8080
      protocol: TCP
  type: NodePort
  selector:
    app: airflow
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: airflow
  namespace: airflow
  annotations:
    external-dns.alpha.kubernetes.io/ttl: "300"
    external-dns.alpha.kubernetes.io/hostname: airflow.staging.company.io
    kubernetes.io/ingress.class: alb
    alb.ingress.kubernetes.io/scheme: internal
    alb.ingress.kubernetes.io/target-type: ip
    alb.ingress.kubernetes.io/certificate-arn: arn:aws:acm:eu-west-1:1234567890:certificate/a01d4f1d-1009-42f4-9f04-c0f98bccffbf
    alb.ingress.kubernetes.io/listen-ports: '[{"HTTP": 80}, {"HTTPS":443}]'
    alb.ingress.kubernetes.io/ssl-redirect: '443'
    alb.ingress.kubernetes.io/healthcheck-path: '/api/v1/ping/'

spec:
  rules:
    - host: airflow.staging.company.io
      http:
        paths:
        - path: /
          pathType: Prefix
          backend:
            service:
              name: airflow
              port:
                number: 80

As you can see, this configuration specifies both HTTP and HTTPS listeners on the loadbalancer and attaches an SSL certificate to the HTTPS listener. This certificate contains the DNS names relavent to the service endpoint (*.internal.company.io).

Upon testing this configuration we were unable to access the Airflow web gui. We received a redirect_uri_mismatch error when attempting to log in via SSO. After some troubleshooting we realised this is because the Airflow application is attempting to forward our authentication request to Google over HTTP which is not permitted on Google’s side. Even though we have specified HTTPS within our URL, and we have configured the base URL of our Airflow applicatkon with HTTPS.

This is the error we were seeing. Note: this is just an example and is not our original URI error

img

To confirm this, we disabled Google oAuth on the deployment and simply used the built-in Airflow RBAC authentication (username and password). This worked successfully and we were able to login to the admin console. So we know that our service is running correctly, and our issue is entirely with authentication using Google. So we began a very lengthy troubleshooting process taking a couple of weeks which involved:

  • Testing with the latest version of Airflow on both the community and official helm charts.
  • Testing using the latest version 1 of Airflow (1.10.15) and version 2 (2.2.3) in case version 2 of Airflow had better security implementation. Sadly this was not the case.
  • Using internal and external loadbalancers
  • Configuring non-default HTTPS ports on the loadbalancers and kubernetes ingress/service
  • Changing the target types of the AWS loadbalancers, along with the service type within Kubernetes
  • Adding and removing HTTP and HTTPs versions of the authorised callbacl URLs in the google oAuth section of GCP.
  • Testing loopback URLs

Throughout all of this troubleshooting we tested various different combinations of configurations of each of the above and we recieved different errors consistently. Sometimes we would simply see a blank page without any error, and when inspecting the requests in the browser developer tools we would find a 400 error. Other times we would find a 302 redirect, but to an incorrect URL. Sometimes we would see the “circles” built-in error page from Airflow (page not found 404):

img

Along the way, we were of course consulting the documentation for both the community chart and the official chart.

Community chart: https://github.com/airflow-helm/charts/tree/main/charts/airflow

Official documentation: https://airflow.apache.org/docs/apache-airflow/stable/index.html

The official documentation for Apache Airflow is not great in my opinion. The navigation is difficult and often it is more effective so simply Google what you are looking for and find a link to the documentation than try to find it within the documentation itself. I would also say that I have never read any documentation before which has so many sections and organised navigation structure, without actually saying anything! It is very difficult to find meaningful explanations for certain aspects of Airflow, and as such we had to resort to other online resources a lot during the troubleshooting process. We discovered many other people encountering the same problems not just with Google oAuth, but with any kind of third party SSO identity privider. It seems that Airflow is simply not built well to handle third party SSO and is geared towards encouraging people to use the standard built-in password authentication method.

It also seems that Airflow does not actually support HTTPS/SSL termination correctly as the documentation describes simply setting a HTTP endpoint with :443 as the method for “enabling” HTTPS… https://airflow.apache.org/docs/apache-airflow/stable/security/webserver.html#ssl

How did it work before?

This is a question we continuously asked ourselves throughout this journey, and the whole painful process, it is easy to forget that we had Airflow deployed and working in production already, using Google oAuth. So how is this possible, when it seems that the application does not properly support HTTPS, a rquirement of Google oAuth?

The only explanation we came up with, was that in our original EKS cluster, which was utilising istio for ingress, we were simply lucky. We believe that since istio is working as a sidecar to proxy all requests in and out of the pod, re-routing them to localhost along the way and adding additional headers etc, that it is somehow managing to force a HTTPS connection upon egress from the pod, allowing Google oAuth to work. If we had not been using istio in the original EKS setup, we believe that Airflow/Google oAuth deployment would have been much more difficult.

Implementaion of AWS Managed Airflow

So we now come to the interesting part of this post. We decided that because of all of the above headaches, it would be much simpler to use a managed solution provided by our cloud hosting service which we are already using.

We are using Terraform to create the underlying required infrastreucture for the Airflow environment, and since this is managed within AWS, we simply need to provide it access to our new EKS cluster via IAM policy and aws-auth configmap. Which will allow the Airflow service IAM role to spin up pods in the cluster to run the jobs, identically to how Airflow was working in the originl EKS cluster.

Locals

We have configured a local resource of name_prefix for the module which takes our Service and Environment tags and joins them together, which makes it easier to add a quick consistent naming convention to all resources in the module:

locals {
  name_prefix = "${var.tags["Service"]}-${var.tags["Environment"]}"
}

First, we need a VPC for the Airflow environment to be deployed into, so for this we can use the official AWS VPC module for terraform: https://registry.terraform.io/modules/terraform-aws-modules/vpc/aws/latest. Our subnets are defined by variables in the .tfvars files which are per-environment.

module "vpc" {
  source  = "terraform-aws-modules/vpc/aws"
  version = "3.11.0"
  name    = "${local.name_prefix}-vpc"
  cidr    = var.vpc_cidr

  azs                 = var.azs
  private_subnets     = var.private_subnets
  public_subnets      = var.public_subnets
  enable_nat_gateway  = true
  single_nat_gateway  = true
  reuse_nat_ips       = true # <= Skip creation of EIPs for the NAT Gateways
  external_nat_ip_ids = [aws_eip.nat.id]

  tags = merge(var.tags, { Name = "${local.name_prefix}-vpc" }, )
}

We also have a single NAT gateway for the deployment:

resource "aws_eip" "nat" {
  vpc  = true
  tags = merge(var.tags, { Name = "${local.name_prefix}-private_nat-gateway" }, )
}

A security group with default egress rule and the self ingress rule is sufficient:

resource "aws_security_group" "airflow_security_group" {
  name_prefix = "${local.name_prefix}-sg-"
  description = "Security group used for airflow envionment"
  vpc_id      = module.vpc.vpc_id

  lifecycle {
    create_before_destroy = true
  }

  tags = merge(var.tags, { Name = "${local.name_prefix}-sg" }, )
}

resource "aws_security_group_rule" "allow_self" {
  type              = "ingress"
  to_port           = 0
  protocol          = "-1"
  from_port         = 0
  security_group_id = aws_security_group.airflow_security_group.id
  self              = true
}

resource "aws_security_group_rule" "allow_egress" {
  type              = "egress"
  to_port           = 0
  protocol          = "-1"
  from_port         = 0
  cidr_blocks       = ["0.0.0.0/0"]
  ipv6_cidr_blocks  = ["::/0"]
  security_group_id = aws_security_group.airflow_security_group.id
}

S3

We create an S3 bucket where the Airflow dags will be stored, as well as any requirements.txt and plugins.zip objects. With mwaa, it is not sufficient to simply have a plugins folder in the S3 bucket as is usually the case with self-hosted Airflow, it must be provided as a .zip file to the Airflow environment config.

resource "aws_s3_bucket" "airflow_dags_bucket" {
  bucket = "company-${local.name_prefix}"
  acl    = "private"

  versioning {
    enabled = true
  }

  tags = var.tags
}

resource "aws_s3_bucket_public_access_block" "airflow_dags_s3_public_access_block" {
  bucket                  = aws_s3_bucket.airflow_dags_bucket.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

IAM

IAM implementation for Airflow is fairly straightforward, AWS provide examples for you to use for your IAM service role and KMS key etc: https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-create-role.html#mwaa-create-role-how-create-role

First we can define a couple of housekeeping data resources to make our life easier. The AWS account ID and the region, these will automatically be populated because our AWS terraform provider is already authenticated with AWS via our deployment pipeline.

data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

Next we can define our IAM trust policy for our service role:, this is also privided by AWS in the docs above.

data "aws_iam_policy_document" "airflow_trust_policy" {
  statement {
    sid    = "AssumeRole"
    effect = "Allow"

    actions = [
      "sts:AssumeRole"
    ]

    principals {
      type        = "Service"
      identifiers = ["airflow.amazonaws.com", "airflow-env.amazonaws.com"]
    }
  }
}

We can then define our Airflow IAM policy, where we can make sure of the above mentioned data resources for account ID and region:

data "aws_iam_policy_document" "airflow_policy" {
  statement {
    effect = "Allow"
    actions = [
      "airflow:PublishMetrics"
    ]

    resources = [
      "arn:aws:airflow:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:environment/${local.name_prefix}"
    ]
  }

  statement {
    effect = "Deny"
    actions = [
      "s3:ListAllMyBuckets"
    ]

    resources = [
      aws_s3_bucket.airflow_dags_bucket.arn
    ]
  }

  statement {
    effect = "Allow"
    actions = [
      "s3:GetBucket*",
      "s3:ListBucket",
      "s3:GetBucketPublicAccessBlock"
    ]

    resources = [
      aws_s3_bucket.airflow_dags_bucket.arn
    ]
  }
...

We also have a KMS key policy for the KMS key used to by Airflow. Note: When creating KMS key policies, ensure that you give full permissions to a role or user to be able to administer the key, otherwise you can find yourself with a KMS key which is unmanagble: https://aws.amazon.com/premiumsupport/knowledge-center/update-key-policy-future/.

Airflow Environment

For the Airflow environment itself we can use the aws_mwaa_environment Terraform resource. Below is the bulk of the mwaa config, you can see that we are providing the requirements.txt and plugins.zip objects which are contained in the dags S3 bucket, you can chose to either create these manually or with Terraform. We are adding the execution_role_arn of the IAM role we have created, and configuring the worker parameters via variables (per environment). Finally the network configuration which will use the VPC subnets and security groups we defined.

resource "aws_mwaa_environment" "airflow_environment" {
  name = local.name_prefix

  airflow_configuration_options = {
    "core.default_task_retries" = 16
    "core.parallelism"          = 1
  }

  source_bucket_arn    = aws_s3_bucket.airflow_dags_bucket.arn
  dag_s3_path          = "dags/"
  plugins_s3_path      = "plugins.zip"
  requirements_s3_path = "requirements.txt"

  execution_role_arn = aws_iam_role.airflow.arn

  kms_key = aws_kms_key.airflow.arn

  webserver_access_mode = "PUBLIC_ONLY"

  environment_class = var.environment_class
  min_workers       = var.min_workers
  max_workers       = var.max_workers

  network_configuration {
    security_group_ids = [aws_security_group.airflow_security_group.id]
    subnet_ids         = slice(module.vpc.private_subnets, 0, 2)
  }
}

You can optionally configure logging for the environment:

dynamic "logging_configuration" {
  for_each = anytrue([var.dag_processing_logs_enabled, var.scheduler_logs_enabled, var.task_logs_enabled, var.webserver_logs_enabled, var.worker_logs_enabled]) ? [1] : []
  content {
    dynamic "dag_processing_logs" {
      for_each = var.dag_processing_logs_enabled ? [1] : []
      content {
        enabled   = var.dag_processing_logs_enabled
        log_level = var.dag_processing_logs_log_level
      }
    }

    dynamic "scheduler_logs" {
      for_each = var.scheduler_logs_enabled ? [1] : []
      content {
        enabled   = var.scheduler_logs_enabled
        log_level = var.scheduler_logs_log_level
      }
    }

    dynamic "task_logs" {
      for_each = var.task_logs_enabled ? [1] : []
      content {
        enabled   = var.task_logs_enabled
        log_level = var.task_logs_log_level
      }
    }
  }
...

Outputs

We have some useful outputs configured that we can use for reference if we need to expand our module in the future, or integrate other services with Airflow:

output "private_subnets" {
  value = module.vpc.private_subnets
}

output "private_subnet_cidr_blocks" {
  value = module.vpc.private_subnets_cidr_blocks
}

output "public_subnets" {
  value = module.vpc.public_subnets
}

output "public_subnet_cidr_blocks" {
  value = module.vpc.public_subnets_cidr_blocks
}

output "security_group_default" {
  value = module.vpc.default_security_group_id
}

Using Managed Airlfow

Importing DAGs

Now that weh ave our MWAA environment set up we can begin importing out DAG files. We can do this simply by uploading them to the S3 bucket created by the Terraform module. The DAG files need to go into a folder in the bucket named “dags”, and can be nested within sub-folders if preferred.

img

Dependencies and Plugins

Dependencies and plugins are controlled by two files, a requirements.txt file anda plugins.zip file. both of which need to sit in the root of the S3 bucket for our DAGs.

img

The Airflow UI

The Airflow UI is identical to what you will see if you are deploying your own Airflow service. You can access it by navigating to the managed Airflow service inside the AWS console and selecting the UI link on the environment which has been created.

img

Note: Your user/role must have access to MWAA in order to be able to login to the UI. The permissions required are:

{
  "Sid": "AirflowAccess",
     "Effect": "Allow",
     "Action": [
       "airflow:CreateWebLoginToken",
       "airflow:GetEnvironment",
       "airflow:ListEnvironments"
   ],
     "Resource": [<your airflow environment ARN]
},
  "Sid": "AirflowListEnvs",
     "Effect": "Allow",
     "Action": "airflow:ListEnvironments",
     "Resource": "*"
}

And for the S3 DAGs bucket created:

{
  "Sid": "DagsBucket",
  "Effect": "Allow",
  "Action": [
     "S3:ListAllMyBuckets",
     "S3:GetBucketPolicy",
     "S3:ListBucket",
     "S3:GetObject",
     "S3:GetObjectRetention",
     "S3:GetObjectVersion",
     "S3:PutObject",
     "S3:RestoreObject"
   ],
  "Resource: [<your dags bucket ARN>]
}

Using MWAA with EKS

When we were running our own hosted Airflow service within out EKS cluster, we didn’t need to worry about certain things which are required for using MWAA with EKS. Our use case (and most others) is that when a DAG task runs, rather than running the DAG on the worker node in the MWAA environment, the task simply spins up a pod in our EKS cluster, with a container entrypoint of the command which actually executes the task at hand. In order to accomplish this using MWAA, there are some additional steps required, which are documented here: https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-eks-example.html

AWS Auth Configmap

If you are using EKS, you are probably familiar with the AWS auth configmap. This is the built-in configmap which allows you to map AWS IAM roles to Kubernetes users/groups. We need to add a mapping for our Airflow IAM role which is used by the MWAA environment:

We use Terraform to create our configmap in our EKS clusters:

{
  rolearn  = "arn:aws:iam::1234567890:role/airflow-staging-role",
  username = "mwaa-service",
  groups = [
    "company:mwaa"
  ]
}

This translates to a YAML format of:

- "groups":
  - "company:mwaa"
  "rolearn": "arn:aws:iam::1234567890:role/airflow-staging-role"
  "username": "mwaa-service"

Next we need to create a kubeconfig for the mwaa-service user which we can do by utilising the aws eks update-kubeconfig command:

aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws

We need to provide this kubeconfig file to the Airflow by adding it to the root of the dags bucket and then specifying the location of the file within the dag parameters:

config_file=<s3 bucket path relative to root>

The Final Hurdle

We actually spent days trying to troubleshoot this last error, including having a support case open with AWS for it in which they were also struggling to understand the cause. The error we were seeing when the DAGs we were testing with tried to run, was a cloudwatch error:

*** Reading remote log from Cloudwatch log_group: airflow-airflow-staging-Task log_stream: era5_pipeline/copy_air_temp_2m_task/2022-02-24T10_53_11.815092+00_00/3.log.
Could not read remote logs from log_group: airflow-airflow-staging-Task log_stream: era5_pipeline/copy_air_temp_2m_task/2022-02-24T10_53_11.815092+00_00/3.log.

This error makes it seem like a permissions issue exists with the IAM role being used by Airflow to read from Cloudwatch, however upon checking Cloudwatch, none of the log streams were ever created in the first place. AWS actually have a knowledge page regarding this error, as it is a known bug and they provide a few solutions which should solve it. You can read about this here: https://docs.aws.amazon.com/mwaa/latest/userguide/t-cloudwatch-cloudtrail-logs.html#t-task-fail-permission

None of the provided solutions worked for us, and after days of trying different things with AWS and ourselves, we managed to narrow down the problem to a specific custom dependency function which was being used in these DAGs:

default_args = build_default_args(**task_args)

The build_default_args function was causing this issue. We managed to find this out by systematically copying small segments of the DAG into a new file, and running them one iteration at a time, until we cam accross the problem line. We then tested without using this custom function and the DAG was able to run correctly.

Conclusion

After many many hours of troubleshooting the Cloudwatch error above, it seemed like AWS MWAA was going to be even more trouble than trying to migrate our own in-house hosted Airflow service. Especially with the added wild-goose chase caused by the fact that the error we were seeing is a known issue within AWS, and so even their own support enigneers believed it to be a problem on their side. The final realisation that it was actually a problem with some of our own custom code was disheartening, however is was a leaening experience for sure and that has allowed me to share that information with you, so that you might not get stuck on a similar issue.

Other than the above mentioned pitfall, the actual deployment process of MWAA and the uploading and parsing of DAG files, and access to the UI was fairly straightforward. Even the kubernetes authentication config is fairly trivial with the aws eks update-kubeconfig command.

The pricing for MWAA is fairly good in my opinion for our use case, and when weighed up agains tthe time/cost of engineers to support and maintain in-house hosted Airflow, it made sense for us. So this was one of the deciding factors in our descision to migrate to the managed Airflow service. Your mileage may vary however. I am hoping to see more utilisation of MWAA in the future and hopefully it will prove to be a good descision to migrate.

Discover and read more posts from Matthew Love
get started
post commentsBe the first to share your opinion
John Bishop
a year ago

Apache Airflow is an open-source platform used for workflow automation and scheduling. Many organizations use Apache Airflow to manage complex workflows and data pipelines. However, managing Airflow on-premises can be challenging, as it requires a lot of resources and expertise. That’s why many companies are migrating to managed services like AWS Managed Airflow.

Migrating to a managed service like AWS Managed Airflow can help organizations reduce the burden of managing and scaling their Airflow environment. This enables them to focus on their core business functions, while the managed service takes care of the infrastructure and maintenance. Additionally, a managed service like AWS Managed Airflow provides better scalability and availability.

Double Cloud is a platform that can help organizations build analytics applications with second-by-second processing using open-source technologies like ClickHouse and Kafka. Double Cloud’s expertise in managing complex infrastructure can be valuable during the migration journey from self-hosted Airflow to AWS Managed Airflow. By leveraging Double Cloud’s services, organizations can ensure a smooth and efficient migration, allowing them to reap the benefits of a managed service like AWS Managed Airflow.

In conclusion, migrating from self-hosted Apache Airflow to a managed service like AWS Managed Airflow can be a smart move for organizations looking to reduce their infrastructure management burden. By partnering with a platform like Double Cloud, organizations can make the migration journey smoother and more efficient, enabling them to focus on their core business functions. Visit https://double.cloud/ to learn more about Double Cloud’s services.

Show more replies