作者:科技狠活与软件技术 2023-05-25 16:20:03
云计算 本指南探讨了如何在 Kubernetes 上自动扩展您的 Kinesis Data Streams 消费者应用程序,以便您可以节省成本并提高资源效率。 本指南探讨了如何在 Kubernetes 上自动扩展您的 Kinesis Data Streams 消费者应用程序,以便您可以节省成本并提高资源效率。
成都创新互联公司企业建站,十年网站建设经验,专注于网站建设技术,精于网页设计,有多年建站和网站代运营经验,设计师为客户打造网络企业风格,提供周到的建站售前咨询和贴心的售后服务。对于成都网站设计、成都网站建设中不同领域进行深入了解和探索,创新互联在网站建设中充分了解客户行业的需求,以灵动的思维在网页中充分展现,通过对客户行业精准市场调研,为客户提供的解决方案。
想了解如何在 Kubernetes 上自动扩展您的 Kinesis Data Streams 消费者应用程序,以便您可以节省成本并提高资源效率?该博客提供了有关如何做到这一点的分步指南。
通过利用 Kubernetes 自动扩展 Kinesis 消费者应用程序,您可以受益于其内置功能,例如 Horizontal Pod Autoscaler。
Amazon Kinesis是一个用于实时数据处理、摄取和分析的平台。Kinesis Data Streams是一种无服务器流数据服务(Kinesis 流数据平台的一部分,还有Kinesis Data Firehose、Kinesis Video Streams和Kinesis Data Analytics。
Kinesis Data Streams 可以弹性扩展并持续适应数据摄取率和流消费率的变化。它可用于构建实时数据分析应用程序、实时仪表板和实时数据管道。
让我们首先概述 Kinesis Data Streams 的一些关键概念。
Kinesis Client Library 确保有一个记录处理器为每个分片运行并处理来自该分片的数据。KCL通过处理与分布式计算和可扩展性相关的许多复杂任务,帮助您使用和处理来自 Kinesis 数据流的数据。它连接到数据流,枚举数据流中的分片,并使用租约来协调分片与其消费者应用程序的关联。
记录处理器为其管理的每个分片实例化。KCL从数据流中拉取数据记录,将记录推送到相应的记录处理器,检查点处理记录。更重要的是,当工作实例计数发生变化或数据流重新分片(分片被拆分或合并)时,它会平衡分片-工作关联(租约)。这意味着您可以通过简单地添加更多实例来扩展您的 Kinesis Data Streams 应用程序,因为KCL它将自动平衡实例之间的分片。
但是,当负载增加时,您仍然需要一种方法来扩展您的应用程序。当然,您可以手动执行此操作或构建自定义解决方案来完成此操作。
这是Kubernetes 事件驱动的自动缩放(KEDA) 可以提供帮助的地方。是一个基于 Kubernetes 的事件驱动的自动伸缩组件,可以像 Kinesis 一样监控事件源,并根据需要处理的事件数量来KEDA伸缩底层Deployment(和s)。Pod
为见证自动缩放的运行,您将使用一个 Java 应用程序,该应用程序使用 Kinesis Client Library ( KCL) 2.x 使用来自 Kinesis Data Stream 的数据。它将部署到Amazon EKS上的 Kubernetes 集群,并使用KEDA. 该应用程序包括ShardRecordProcessor处理来自 Kinesis 流的数据并将其保存到 DynamoDB 表的实现。我们将使用 AWS CLI 为 Kinesis 流生成数据并观察应用程序的扩展。
之前,我们深入了解,这里是KEDA.
KEDA是一个开源 CNCF 项目,它建立在原生 Kubernetes 原语(例如 Horizontal Pod Autoscaler)之上,可以添加到任何 Kubernetes 集群。以下是其关键组件的高级概述(您可以参考KEDA 文档进行深入研究):
您将看到Kinesis Stream KEDA 缩放器正在运行,它根据 AWS Kinesis Stream 的分片数进行缩放。
现在让我们继续本文的实际部分。
除了 AWS 账户外,您还需要安装AWS CLI、kubectl、Docker、 Java 11 和Maven。
您可以通过多种方式创建Amazon EKS 集群。我更喜欢使用eksctl CLI,因为它提供了便利。使用以下方法创建 EKS 集群eksctl非常简单:
eksctl create cluster --name --region
有关详细信息,请参阅Amazon EKS 入门 – eksctl文档。
创建一个 DynamoDB 表来保存应用程序数据。您可以使用 AWS CLI 通过以下命令创建表:
aws dynamodb create-table \ --table-name users \ --attribute-definitions AttributeName=email,AttributeType=S \ --key-schema AttributeName=email,KeyType=HASH \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
使用 AWS CLI创建一个包含两个分片的 Kinesis 流:
aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2
克隆此 GitHub 存储库并将其更改为正确的目录:
git clone https://github.com/abhirockzz/kinesis-keda-autoscalingcd kinesis-keda-autoscaling
好的,让我们开始吧!
出于本教程的目的,您将使用YAML 文件部署KEDA. 但您也可以使用Helm 图表。
安装KEDA:
# update version 2.8.2 if requiredkubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
验证安装:
# check Custom Resource Definitionskubectl get crd# check KEDA Deploymentskubectl get deployment -n keda# check KEDA operator logskubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsnotallow='{.items[0].metadata.name}' -n keda) -n keda
KEDA 操作员和 Kinesis 消费者应用程序需要调用 AWS API。由于两者都将作为 EKS 中的 s 运行Deployment,我们将使用IAM 服务账户角色 (IRSA)来提供必要的权限。
在这种特殊情况下:
将您的 AWS 账户 ID 和 OIDC 身份提供商设置为环境变量:
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)#update the cluster name and region as requiredexport EKS_CLUSTER_NAME=demo-eks-clusterexport AWS_REGION=us-east-1OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
JSON为角色创建一个包含可信实体的文件:
read -r -d '' TRUST_RELATIONSHIP < trust_keda.json
现在,创建 IAM 角色并附加策略(查看policy_kinesis_keda.json文件了解详细信息):
export ROLE_NAME=keda-operator-kinesis-roleaws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.jsonaws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-kinesis-policy
关联 IAM 角色和服务帐户:
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}# verify the annotation kubectl describe serviceaccount/keda-operator -n keda
您需要重新启动KEDA操作员Deployment才能生效:
kubectl rollout restart deployment.apps/keda-operator -n keda# to verify, confirm that the KEDA operator has the right environment variableskubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsnotallow={.items..metadata.name}) | grep "^\s*AWS_"# expected outputAWS_STS_REGIONAL_ENDPOINTS: regionalAWS_DEFAULT_REGION: us-east-1AWS_REGION: us-east-1AWS_ROLE_ARN: arn:aws:iam:::role/keda-operator-kinesis-roleAWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
首先创建一个 Kubernetes 服务帐户:
kubectl apply -f - <
JSON为角色创建一个包含可信实体的文件:
read -r -d '' TRUST_RELATIONSHIP < trust.json
现在,创建 IAM 角色并附加策略(查看policy.json文件了解详细信息):
export ROLE_NAME=kcl-consumer-app-roleaws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for KCL consumer app on EKS"aws iam create-policy --policy-name kcl-consumer-app-policy --policy-document file://policy.jsonaws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/kcl-consumer-app-policy
关联 IAM 角色和服务帐户:
kubectl annotate serviceaccount -n default kcl-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}# verify the annotationkubectl describe serviceaccount/kcl-consumer-app-sa
核心基础设施现已准备就绪。让我们准备并部署消费者应用程序。
您首先需要构建 Docker 镜像并将其推送到Amazon Elastic Container Registry (ECR)(有关Dockerfile详细信息,请参阅 )。
# create runnable JAR filemvn clean compile assembly\:single# build docker imagedocker build -t kcl-consumer-app .AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)# create a private ECR repoaws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.comaws ecr create-repository --repository-name kcl-consumer-app --region us-east-1# tag and push the imagedocker tag kcl-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latestdocker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
更新consumer.yaml以包含您刚刚推送到 ECR 的 Docker 映像。清单的其余部分保持不变:
apiVersion: apps/v1kind: Deploymentmetadata: name: kcl-consumerspec: replicas: 1 selector: matchLabels: app: kcl-consumer template: metadata: labels: app: kcl-consumer spec: serviceAccountName: kcl-consumer-app-sa containers: - name: kcl-consumer image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest imagePullPolicy: Always env: - name: STREAM_NAME value: kinesis-keda-demo - name: TABLE_NAME value: users - name: APPLICATION_NAME value: kinesis-keda-demo - name: AWS_REGION value: us-east-1 - name: INSTANCE_NAME valueFrom: fieldRef: fieldPath: metadata.name
创建Deployment:
kubectl apply -f consumer.yaml# verify Pod transition to Running statekubectl get pods -w
现在您已经部署了消费者应用程序,KCL库应该开始行动了。它要做的第一件事是在 DynamoDB 中创建一个“控制表”——这应该与 KCL 应用程序的名称相同(在本例中为 )kinesis-keda-demo。
进行初始协调和创建表可能需要几分钟时间。您可以检查消费者应用程序的日志以跟踪进度。
kubectl logs -f $(kubectl get po -l=app=kcl-consumer --output=jsnotallow={.items..metadata.name})
租约分配完成后,检查表并记下leaseOwner属性:
aws dynamodb describe-table --table-name kinesis-keda-demoaws dynamodb scan --table-name kinesis-keda-demo
现在,让我们使用 AWS CLI 将一些数据发送到 Kinesis 流。
export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo.com --data $(echo -n '{"name":"user3", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user4@foo.com --data $(echo -n '{"name":"user4", "city":"seattle"}' | base64)
KCL 应用程序将每条记录保存到目标DynamoDB表(在本例中已命名users)。您可以检查表格以验证记录。
aws dynamodb scan --table-name users
注意到processed_by属性的值了吗?它与 KCL 消费者相同Pod。这将使我们更容易验证端到端的自动缩放过程。
这是ScaledObject定义。请注意,它的目标是kcl-consumer Deployment(我们刚刚创建的那个)并且shardCount设置为1:
apiVersion: keda.sh/v1alpha1kind: ScaledObjectmetadata: name: aws-kinesis-stream-scaledobjectspec: scaleTargetRef: name: kcl-consumer triggers: - type: aws-kinesis-stream metadata: # Required streamName: kinesis-keda-demo # Required awsRegion: "us-east-1" shardCount: "1" identityOwner: "operator"
创建KEDAKinesis 缩放器:
kubectl apply -f keda-kinesis-scaler.yaml
Pod我们从我们的一个 KCL 应用程序开始。但是,多亏了KEDA,我们现在应该看到第二次Pod出现了。
kubectl get pods -l=app=kcl-consumer -w# check logs of the new podkubectl logs -f
我们的应用程序能够自动缩放到两个,因为我们在定义中Pods指定了。这意味着Kinesis 流中的每个分片都会有一个。shardCount: "1"ScaledObjectPod
检查kinesis-keda-demo控制表DynamoDB:您应该看到leaseOwner.
让我们向 Kinesis 流发送更多数据。
export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user5@foo.com --data $(echo -n '{"name":"user5", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user6@foo.com --data $(echo -n '{"name":"user6", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user7@foo.com --data $(echo -n '{"name":"user7", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user8@foo.com --data $(echo -n '{"name":"user8", "city":"seattle"}' | base64)
验证属性的值processed_by。由于我们已经扩展到两个Pods,每个记录的值应该不同,因为每个记录都Pod将处理来自 Kinesis 流的记录子集。
让我们将分片数量从两个扩展到三个,并继续监控KCL应用程序的自动扩展。
aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING
一旦 Kinesis 重新分片完成,KEDA缩放器将开始行动并将 KCL 应用程序扩展到三个Pods。
kubectl get pods -l=app=kcl-consumer -w
kinesis-keda-demo和之前一样,在控制表中确认Kinesis shard lease已经更新DynamoDB。检查leaseOwner属性。
继续向 Kinesis 流发送更多数据。正如预期的那样,Pods 将共享记录处理,这将反映在表processed_by中的属性中users。
export KINESIS_STREAM=kinesis-keda-demoaws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user9@foo.com --data $(echo -n '{"name":"user9", "city":"new york"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user10@foo.com --data $(echo -n '{"name":"user10", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user11@foo.com --data $(echo -n '{"name":"user11", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user12@foo.com --data $(echo -n '{"name":"user12", "city":"seattle"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user14@foo.com --data $(echo -n '{"name":"user14", "city":"tel aviv"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user15@foo.com --data $(echo -n '{"name":"user15", "city":"new delhi"}' | base64)aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user16@foo.com --data $(echo -n '{"name":"user16", "city":"seattle"}' | base64)
到目前为止,我们只在一个方向上进行了扩展。当我们减少 Kinesis 流的分片容量时会发生什么?亲自尝试一下:将分片数从三个减少到两个,看看 KCL 应用程序会发生什么。
验证端到端解决方案后,您应该清理资源以避免产生任何额外费用。
删除 EKS 集群、Kinesis 流和 DynamoDB 表。
eksctl delete cluster --name keda-kinesis-demoaws kinesis delete-stream --stream-name kinesis-keda-demoaws dynamodb delete-table --table-name users
在本文中,您学习了如何使用KEDA自动缩放KCL使用来自 Kinesis 流的数据的应用程序。
您可以根据您的应用要求配置 KEDA 定标器。例如,您可以将Kinesis 流中的每三个分片设置为shardCount一个3。Pod然而,如果你想维护一个一对一的映射,你可以设置为shardCount并1会KCL处理分布式协调和租约分配,从而确保每个Pod记录处理器都有一个实例。这是一种有效的方法,可让您扩展 Kinesis 流处理管道以满足应用程序的需求。
本文标题:在Kubernetes上自动缩放KinesisDataStreams应用程序
文章路径:http://www.shufengxianlan.com/qtweb/news40/69340.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联