An automated data pipeline using Lambda, S3 and Glue - Big Data with Cloud Computing

แชร์
ฝัง
  • เผยแพร่เมื่อ 11 ก.ย. 2021
  • For more details , you can refer this documentation:
    docs.aws.amazon.com/glue/late...
    Steps followed in this use-case:
    1)Create Source s3 bucket where JSON will be landed
    2)Create destination s3 bucket where csv will be written
    3)Create role for AWS Lambda with cloud-watch , s3 , Glue permission
    4)Creating Lambda Function which will be triggered by s3 object create event & will trigger glue job , this lambda will have the role specified in step 3
    5)Creating role for Glue Job with cloud-watch , s3 full access
    6)Create Glue Job with role created in step 5
    Code for AWS Glue:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    def read_nested_json(df):
    column_list = []
    for column_name in df.schema.names:
    if isinstance(df.schema[column_name].dataType, ArrayType):
    df = df.withColumn(column_name,explode(column_name))
    column_list.append(column_name)
    elif isinstance(df.schema[column_name].dataType, StructType):
    for field in df.schema[column_name].dataType.fields:
    column_list.append(col(column_name + "." + field.name).alias(column_name + "_" + field.name))
    else:
    column_list.append(column_name)
    df = df.select(column_list)
    return df
    def flatten(df):
    read_nested_json_flag = True
    while read_nested_json_flag:
    df = read_nested_json(df);
    read_nested_json_flag = False
    for column_name in df.schema.names:
    if isinstance(df.schema[column_name].dataType, ArrayType):
    read_nested_json_flag = True
    elif isinstance(df.schema[column_name].dataType, StructType):
    read_nested_json_flag = True;
    return df;
    def main():
    @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ["VAL1","VAL2"])
    file_name=args['VAL1']
    bucket_name=args['VAL2']
    print("Bucket Name" , bucket_name)
    print("File Name" , file_name)
    input_file_path="s3a://{}/{}".format(bucket_name,file_name)
    print("Input File Path : ",input_file_path);
    df = spark.read.option("multiline", True).option("inferSchema", False).json(input_file_path)
    df1=flatten(df)
    df1.coalesce(1).write.format("csv").option("header", "true").save("s3a://destinationflattenjson/{}".format(file_name.split('.')[0]))
    main()
    Code for AWS Lambda:
    import json
    import boto3
    def lambda_handler(event, context):
    file_name = event['Records'][0]['s3']['object']['key']
    bucketName=event['Records'][0]['s3']['bucket']['name']
    print("File Name : ",file_name)
    print("Bucket Name : ",bucketName)
    glue=boto3.client('glue');
    response = glue.start_job_run(JobName = "s3_lambda_glue_s3", Arguments={"--VAL1":file_name,"--VAL2":bucketName})
    print("Lambda Invoke ")
    Check this playlist for more AWS Projects in Big Data domain:
    • Demystifying Data Engi...
  • วิทยาศาสตร์และเทคโนโลยี

ความคิดเห็น • 77

  • @parthakaizer
    @parthakaizer 2 ปีที่แล้ว +3

    Please keep uploading videos, you don't know how many lives you are touching man!

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว

      Thank you Partha Sarathi Barman, I will! Happy Learning :-)

  • @radhikaa5462
    @radhikaa5462 2 ปีที่แล้ว +3

    Very good informative tutorial, superb explanation of code and ETL pipeline ! I worked on Lambda before and this video helped me to work on Glue integrating with S3 and Lambda. Thank you !

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว +1

      Glad to know the video was helpful to you radhika a! Happy Learning :-)

  • @Alexr26
    @Alexr26 ปีที่แล้ว +2

    Awesome tutorial, I just used this information to create my first automated pipeline. Thanks a lot for sharing!

  • @parthakaizer
    @parthakaizer 2 ปีที่แล้ว +2

    I just had a requirement of this same solution. Thank you so much! You are a lifesaver!

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว

      Glad it helped Partha Sarathi Barman! Happy Learning :-)

    • @XoXo-ou9yu
      @XoXo-ou9yu 2 ปีที่แล้ว +1

      @@KnowledgeAmplifier1 The script is creating Spark Session every time there is a file dropped in S3 bucket, This is making the Glue ETL extremally slow. How do we take care of that?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  ปีที่แล้ว

      @@XoXo-ou9yu to avoid triggering 100 times when 100 files are uploaded, , designate one special key (file name) or prefix which will be used for trigger , once all the required files are written in s3 , then write the special file and configure the s3 event notification based on that file only , so that lambda will not trigger multiple times , it will trigger AWS Glue job only once (when the special file is written) ..
      You can check this video for details --
      th-cam.com/video/otm7Nbmvy3E/w-d-xo.html
      Hope this will be helpful! Happy Learning :-)

  • @gouthammadarapu8430
    @gouthammadarapu8430 9 หลายเดือนก่อน +1

    Nice and clearly explained bro 👌

  • @Truth___
    @Truth___ 2 ปีที่แล้ว

    Very informative video 👍

  • @AliciaMarkoe
    @AliciaMarkoe 5 หลายเดือนก่อน +1

    Thank you 🦋

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  5 หลายเดือนก่อน

      You are welcome Alicia Markoe! Happy Learning

  • @sriadityab4794
    @sriadityab4794 2 ปีที่แล้ว +1

    Thank you for the informative tutorial. Suppose if I have like files landing in different folders in S3 bucket with date partitions, how can we write the lambda code to trigger the glue job? Here I want the glue job to perform join operation for all the different sources files which I means until and unless all the files come and land in S3 in different folders with date partition, my glue job should not be triggered. Thanks

  • @sagar7958
    @sagar7958 ปีที่แล้ว +1

    Dhanyawad :)

  • @RajeshKumar-re8tj
    @RajeshKumar-re8tj 10 หลายเดือนก่อน +2

    Hey, Please make course based on aws of 10-20 project availble for data engineer. I am intrested in your teaching style.

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  8 หลายเดือนก่อน

      Hello Rajesh Kumar , Thank you for your interest in my teaching style. I'm delighted to hear that you're interested in an AWS-based data engineering course. I'd like to let you know that I've already created a comprehensive Medium blog which covers a range of projects for data engineers with in-depth explanations. You can access it to explore these projects and learn at your own pace : medium.com/@satadru1998/7-end-to-end-modern-data-engineering-projects-for-free-3c1c5f09d89e .
      If you have any questions or need further guidance, please feel free to reach out!

  • @pinakisaha8179
    @pinakisaha8179 ปีที่แล้ว

    Thank you so much for this great video, I was able to execute it successfully, however in destination file its only adding 2 rows of data instead of all, what could be the reason? Do I need to modify the glue code? please advise.. Thanks again

  • @krishnasanagavarapu4858
    @krishnasanagavarapu4858 2 ปีที่แล้ว +1

    awesome

  • @sudipbala9647
    @sudipbala9647 2 ปีที่แล้ว

    Thank you.

  • @sriadityab4794
    @sriadityab4794 2 ปีที่แล้ว

    How to handle if there are multiple files dropped in S3 at the same time where we need trigger one glue job? How should we handle Lambda here? Any help is appreciated.

  • @vikinist
    @vikinist ปีที่แล้ว

    cant we use glue job(Data target) directly for converting json to csv?

  • @durgarasane-kolapkar1842
    @durgarasane-kolapkar1842 8 หลายเดือนก่อน

    Thank for the tutorial Sir! In case, we get a burst of files in S3, there will be multiple invocations of Lambda and each lambda will also fire Glue Job. How to handle this scenario? Should we increase concurrency of Glue Job? If Yes - How much concurrency to set for Glue Job as we are not sure how many files will come to S3 at a time. Please advice

  • @niranjanjamkhande3773
    @niranjanjamkhande3773 2 ปีที่แล้ว +1

    Great video. Thanks a lot.
    If the data is distributed in different folders in s3 or data lands in various folders in a single bucket, then how to modify codes and particular folder as trigger?
    Pls help.
    And I tried with nested json having variety of datatypes (int, struct, array). At that time could not get csv converted table in destination bucket. Its working only when data types are only array, and only struct.

  • @sayalibendre4543
    @sayalibendre4543 2 ปีที่แล้ว

    what can be done if we want output file name not (part."""".....) in transformed bucket?

  • @datapragmatic
    @datapragmatic 7 หลายเดือนก่อน

    Hi, why do you use a lambda for trigger the Glue's job? can't you use the Glue's triggers?

  • @monamidatta1611
    @monamidatta1611 11 หลายเดือนก่อน

    Can I make lambda to trigger the glue job at a particular time? As I don't want to get it trigger when file is landing....as my files are landing the whole day...so recursion can happen...plz help

  • @AA-gp2vv
    @AA-gp2vv ปีที่แล้ว +1

    Thanks

  • @Arvindkumar-mb8yj
    @Arvindkumar-mb8yj ปีที่แล้ว

    Can ywe also load below CSV type file from S3 to snowflake ?
    There is CSV file with some comment in initial 2-3 line and then header starts and at the end there is one comment line which says total number of records in CSV file. How to crawl such CSV file and load into snowflake ?

  • @DiaaKasem0
    @DiaaKasem0 2 ปีที่แล้ว +1

    Thank you so much for this great video... But, the question that I have in mind, is for this use case you showed, wouldn't just using Lambda alone to make the transformation be sufficient ? .. no need to trigger Glue .. just read json from s3, flatten, write to output s3 ... or what is wrong with this idea ? .. and if you know, when what I am suggesting fails ? ... and when Glue will be really needed ? ( I expect in processing all existing files in a bucket ... or can we use lambda for that ? ) Thank You :) again

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว +1

      I am using spark for flattening the json ,and in Lambda , you can not run PySpark , so I used Glue , anyway if the file is small , you can flatten json with native python too but if the file is large enough to process within 15 mins , then you have to go with Glue as Lambda code run has max time limit of 15 mins!

    • @DiaaKasem0
      @DiaaKasem0 2 ปีที่แล้ว +1

      @@KnowledgeAmplifier1 Thank You :) so much, I have that case where i have million of small files not big ones, so I always wondered why every were people used pyspark while lambda can do it, now I understand

    • @XoXo-ou9yu
      @XoXo-ou9yu 2 ปีที่แล้ว +1

      @@DiaaKasem0 The script is creating Spark Session every time there is a file dropped in S3 bucket, This is making the Glue ETL extremally slow. How do we take care of that?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  ปีที่แล้ว

      @@XoXo-ou9yu to avoid triggering 100 times when 100 files are uploaded, , designate one special key (file name) or prefix which will be used for trigger , once all the required files are written in s3 , then write the special file and configure the s3 event notification based on that file only , so that lambda will not trigger multiple times , it will trigger AWS Glue job only once (when the special file is written) ..
      You can check this video for details --
      th-cam.com/video/otm7Nbmvy3E/w-d-xo.html
      Hope this will be helpful! Happy Learning :-)

  • @nmadhavirao
    @nmadhavirao 2 ปีที่แล้ว +1

    @
    Knowledge Amplifier Thank you for the video its very informative. I have one question: In real time, we will not be using AWS console to develop scripts. How would you write all code using python (including manual settings like like assigning AWS glue role etc that you did from scripts) and how and where in AWS would you deploy the code?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว +1

      You can use AWS CLI or boto3

    • @nmadhavirao
      @nmadhavirao 2 ปีที่แล้ว

      @@KnowledgeAmplifier1 thank you

    • @XoXo-ou9yu
      @XoXo-ou9yu 2 ปีที่แล้ว

      @@nmadhavirao The script is creating Spark Session every time there is a file dropped in S3 bucket, This is making the Glue ETL extremally slow. How do we take care of that?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  ปีที่แล้ว

      @@XoXo-ou9yu to avoid triggering 100 times when 100 files are uploaded, , designate one special key (file name) or prefix which will be used for trigger , once all the required files are written in s3 , then write the special file and configure the s3 event notification based on that file only , so that lambda will not trigger multiple times , it will trigger AWS Glue job only once (when the special file is written) ..
      You can check this video for details --
      th-cam.com/video/otm7Nbmvy3E/w-d-xo.html
      Hope this will be helpful! Happy Learning :-)

  • @nmadhavirao
    @nmadhavirao 2 ปีที่แล้ว +1

    @Knowledge Amplifier I have a question. I thought when we use Glue we have to crawl to create data catalog. In above example you did not do it. When do we crawl vs when do we not?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว +1

      See glue crawler is optional stuff, if you are trying to infer schema from source system and then create a table in Glue Data Catalog and then want to use that table in Glue job then go with Crawler else if you are using normal spark code which can be used easily anywhere (onprem or EMR or glue wherever) then write a generalized code like I wrote and use glue as a platform to run light weight ETL 😊Hope you got this..and also remember for all the source system, you can't make connections with glue crawler and the source to crawl the data
      Happy Learning 😊

    • @nmadhavirao
      @nmadhavirao 2 ปีที่แล้ว

      @@KnowledgeAmplifier1 thank you so much for the reply. I have one more question. In real time, we will not be using AWS console to develop scripts. How would you write all code using python (including manual settings like like assigning AWS glue role etc that you did in the video with in the scripts) and how and where in AWS would you deploy the code?

    • @aniruddhyadav4794
      @aniruddhyadav4794 ปีที่แล้ว

      @@nmadhavirao You can use cloud formation templates and code pipeline for real time without console

  • @mayanktripathi4u
    @mayanktripathi4u 2 ปีที่แล้ว +3

    Thanks for this video, its really helpful. Could you please also create a video on AWS DataPipeline may be for similar requirement which is accomplished in this video using AWS Glue. Actually I am facing an issue with AWS DataPipeline with custom scripts.

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว +3

      Sure Mayank Tripathi , will upload soon , Stay tuned , Happy Learning :-)

    • @manojt7012
      @manojt7012 2 ปีที่แล้ว +2

      @@KnowledgeAmplifier1 Hi bro. Can you make a video to differentiate when to use GLUE and when to use EMR. as both serves the similar purpose. Maybe Glue is high cost and serverless with less bootstraping time. Is there any other to consider?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว +3

      @@manojt7012, we should use Glue for low/medium complexity ETL & EMR for high complexity ETL.
      If the data is some TB range data , if the transformation is not very heavy , if the process can complete within 1-2 hours & frequency of the job run is very high , then you can go with AWS Glue.
      If the ETL is having high complexity & the datasize is in Petabytes range (which is quite huge),if the job takes 6-7 hours to complete then EMR is preferred over Glue in this case (as it might happen that Glue may run out of memory but due to autoscaling of EMR it will not happen with EMR).
      Hope it will give some idea when to use EMR and when Glue.
      Happy Learning :-)

    • @manojt7012
      @manojt7012 2 ปีที่แล้ว

      @@KnowledgeAmplifier1 thanks a lot for clear explanation.

    • @XoXo-ou9yu
      @XoXo-ou9yu 2 ปีที่แล้ว +1

      The script is creating Spark Session every time there is a file dropped in S3 bucket, This is making the Glue ETL extremally slow. How do we take care of that?

  • @parthakaizer
    @parthakaizer 2 ปีที่แล้ว +1

    Just how do I write from Glue to a redshift table do you have any already created video?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  2 ปีที่แล้ว

      You can check this -- docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-redshift.html Hope this will be helpful.. Happy Learning :-)

  • @parthasarathibarman9862
    @parthasarathibarman9862 2 ปีที่แล้ว

    Hi Buddy, once again - one more question, I want to write clean production quality code - where from can I practice? any course? Also where from I can practice for data engineering projects using python. Any links to course free/paid which are genuinely good will really be appreciated. Thanks for your help in advance!

    • @XoXo-ou9yu
      @XoXo-ou9yu 2 ปีที่แล้ว

      The script is creating Spark Session every time there is a file dropped in S3 bucket, This is making the Glue ETL extremally slow. How do we take care of that?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  ปีที่แล้ว

      @@XoXo-ou9yu to avoid triggering 100 times when 100 files are uploaded, , designate one special key (file name) or prefix which will be used for trigger , once all the required files are written in s3 , then write the special file and configure the s3 event notification based on that file only , so that lambda will not trigger multiple times , it will trigger AWS Glue job only once (when the special file is written) ..
      You can check this video for details --
      th-cam.com/video/otm7Nbmvy3E/w-d-xo.html
      Hope this will be helpful! Happy Learning :-)

  • @XoXo-ou9yu
    @XoXo-ou9yu 2 ปีที่แล้ว +1

    Your script is creating Spark Session every time there is a file dropped in S3 bucket, This is making the Glue ETL extremally slow. How do we take care of that? @Knowledge Amplifier

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  ปีที่แล้ว +1

      to avoid triggering 100 times when 100 files are uploaded, , designate one special key (file name) or prefix which will be used for trigger , once all the required files are written in s3 , then write the special file and configure the s3 event notification based on that file only , so that lambda will not trigger multiple times , it will trigger AWS Glue job only once (when the special file is written) ..
      You can check this video for details --
      th-cam.com/video/otm7Nbmvy3E/w-d-xo.html
      Hope this will be helpful! Happy Learning :-)

    • @XoXo-ou9yu
      @XoXo-ou9yu ปีที่แล้ว

      @@KnowledgeAmplifier1 Thanks a lot!

    • @XoXo-ou9yu
      @XoXo-ou9yu ปีที่แล้ว

      @@KnowledgeAmplifier1 I used the script provided in the description, but it is only parsing the first line of the json file. and ignoring the rest of the file. Any idea why? using cloud watch I can see just a warn message "jndi lookup class is not available because this jre does not support jndi"

  • @saicharanpeddireddy5859
    @saicharanpeddireddy5859 ปีที่แล้ว +1

    how can we develop CI CD for this exact process.

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  5 หลายเดือนก่อน

      Hello Saicharan Peddireddy 585, in my recent video , the process is explained , you can have a look if still looking for the answer -- th-cam.com/video/N5Z_M1HWTMA/w-d-xo.htmlsi=q_HG4aKkyln0zaM0 Hope this will be helpful! Happy Learning

  • @ardavanmoinzadeh801
    @ardavanmoinzadeh801 ปีที่แล้ว +1

    Where is the architecture diagram for this project?

    • @KnowledgeAmplifier1
      @KnowledgeAmplifier1  ปีที่แล้ว +1

      Hello Ardavan Moinzadeh , you can have a look in this link --
      github.com/SatadruMukherjee/Data-Preprocessing-Models/blob/main/AWS%20Glue%20Job%20trigger%20from%20Lambda.JPG
      Hope this will be helpful! Happy Learning

  • @veerachegu
    @veerachegu 2 ปีที่แล้ว +1

    Any one here to help me i got project on aws glue

  • @DanielWeikert
    @DanielWeikert 2 ปีที่แล้ว

    I tried the same but converting to parquet from csv.
    I receive the following error Glue job 99.pyWriteDynamicFrame. Illegal empty schema
    any ideas?
    THanks

  • @santoshmehta1999
    @santoshmehta1999 ปีที่แล้ว

    I am getting this error
    {
    "errorMessage": "'Records'",
    "errorType": "KeyError",
    "requestId": "caf276e6-da0a-44df-abdf-f62aec50c3ba",
    "stackTrace": [
    " File \"/var/task/lambda_function.py\", line 6, in lambda_handler
    file_name = event['Records'][0]['s3']['object']['key']
    "
    ]
    }

    • @santoshmehta1999
      @santoshmehta1999 ปีที่แล้ว +1

      Got the solutuion

    • @vaibhavverma1340
      @vaibhavverma1340 10 หลายเดือนก่อน

      @@santoshmehta1999 May I know how?????????????