59. Databricks Pyspark:Slowly Changing Dimension|SCD Type1| Merge using Pyspark and Spark SQL

แชร์
ฝัง
  • เผยแพร่เมื่อ 13 ม.ค. 2025

ความคิดเห็น • 120

  • @parameshgosula5510
    @parameshgosula5510 3 ปีที่แล้ว +1

    Very informative..keep continuing..

  • @sohelsayyad5572
    @sohelsayyad5572 ปีที่แล้ว +1

    Informative video... Nd comment section too.
    Thanks Raja sir 💐

  • @shubhamalsunde3230
    @shubhamalsunde3230 11 หลายเดือนก่อน +1

    nice information

  • @tanushreenagar3116
    @tanushreenagar3116 ปีที่แล้ว +1

    Superb sir now I have cleared this concept

  • @MrVivekc
    @MrVivekc 2 ปีที่แล้ว +1

    Very good content. keep it up.

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว

      Thanks Vivek

    • @MrVivekc
      @MrVivekc 2 ปีที่แล้ว +1

      @@rajasdataengineering7585 wanted to confirm one thing. Is delta lake feature available in spark 3.x onwards?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว

      Yes available

    • @MrVivekc
      @MrVivekc 2 ปีที่แล้ว

      @@rajasdataengineering7585 OK and can we implement delta lake in spark 2.3.x?

  • @RadhaJhaq
    @RadhaJhaq 2 ปีที่แล้ว +1

    Very well explained

  • @ravinarang6865
    @ravinarang6865 ปีที่แล้ว +1

    Your videos are nice.

  • @joyo2122
    @joyo2122 2 ปีที่แล้ว +1

    Great Video for data scientist like me

  • @sanjayss8564
    @sanjayss8564 2 ปีที่แล้ว +1

    Great explanation 👏👏, thanks

  • @surenderraja1304
    @surenderraja1304 ปีที่แล้ว +2

    Very Nice . Is it possible to supply the column names dynamically from somewhere. currently the columns names ON condition is hardcoded as id and also the set columns are hardcoded. can we try to pull those columns dynamically from a list or array or config file

  • @abhaybisht101
    @abhaybisht101 3 ปีที่แล้ว +1

    Nice content Raja👍

  • @cajaykiran
    @cajaykiran 2 ปีที่แล้ว +1

    Thank you

  • @rajeshmanepalli7367
    @rajeshmanepalli7367 9 หลายเดือนก่อน +1

    in real time which one we can use either pyspark or sql which one is effective

    • @rajasdataengineering7585
      @rajasdataengineering7585  9 หลายเดือนก่อน

      Both are performing at same level. Its all about developer's convenience

  • @rambabuposa5082
    @rambabuposa5082 ปีที่แล้ว +1

    Hi Raja, nice videos. have gone through all of your videos.
    In this video, you have titled like this SCD Type1. As per my knowledge, its Delta Lake with all kinds of history (versions). I think it should be SCD Type2.

    • @rajasdataengineering7585
      @rajasdataengineering7585  ปีที่แล้ว

      Hi Rambabu, thanks for your comment. I hope I explained merge statement which overwrites the previous version and not maintaining history. Anyway I will check the video and make corrections if needed.
      Also I have posted another video on scd type 2

  • @sravankumar1767
    @sravankumar1767 3 ปีที่แล้ว

    Nice explanation 👌

  • @awasthi4948
    @awasthi4948 2 ปีที่แล้ว +1

    Truly appreciate your efforts!!
    Can you please share the script which you have used, So that we can do hands on same. ...

  • @JL-qc5gq
    @JL-qc5gq 2 ปีที่แล้ว +2

    How do we update records in db table via jdbc in databricks? I tried read and write (overwrite/append) but not update.

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว

      You can use options presql and postsql statement using jdbc connection

    • @JL-qc5gq
      @JL-qc5gq 2 ปีที่แล้ว +1

      @@rajasdataengineering7585 do you have example? let's say updating records using presql and postsql options

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว

      I haven't yet created a video on it

    • @bachannigam4332
      @bachannigam4332 6 หลายเดือนก่อน

      I have a suggestion, create a stored procedure to delete the records which r found, call the procedure before insert, then insert all the values as new values

  • @ashishsharan6503
    @ashishsharan6503 3 ปีที่แล้ว +1

    what will be the syntax for inserting record manually into Delta lake and dataframe using PySpark

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 ปีที่แล้ว +1

      HI Ashish, It is same SQL syntax but use %sql magic command if you are using pyhon or scala notebook.
      If you want insert based on dataframe, convert the dataframe into temp view first then you can use sql insert syntax

  • @muvvalabhaskar3948
    @muvvalabhaskar3948 9 หลายเดือนก่อน +1

    Hi in this example there is only one table
    If there are multiple tables with multiple columns and primary key also different for each table how do we generalize this one

  • @Ek_e_Alfaaz
    @Ek_e_Alfaaz ปีที่แล้ว +2

    Sir please make playlist on streaming

  • @kartikeshsaurkar4353
    @kartikeshsaurkar4353 2 ปีที่แล้ว +5

    I think video title should change to "how to implement SCD 1 in databricks". It'll reach to larger audience

  • @kamaltheja
    @kamaltheja 4 หลายเดือนก่อน

    Can you please share all the notebooks in this series?

  • @MrinalDas3107
    @MrinalDas3107 3 ปีที่แล้ว +1

    loved the content. Thanks Brother

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 ปีที่แล้ว

      Thank you Mrinal.

    • @MrinalDas3107
      @MrinalDas3107 3 ปีที่แล้ว +1

      @@rajasdataengineering7585 I am planning to make a transition to Data Engineering and would love to have someone like you guide me in my journey. Could we connect in some way?

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 ปีที่แล้ว +1

      Sure, please contact me at audaciousazure@gmail.com

    • @MrinalDas3107
      @MrinalDas3107 3 ปีที่แล้ว

      @@rajasdataengineering7585 sure let me do that right away. Thanks alot :-)

  • @ashishsharan6503
    @ashishsharan6503 3 ปีที่แล้ว +1

    Do we have SCD type 1 and Type 2 videos in PySpark and Spark SQL ?

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 ปีที่แล้ว

      HI Ashish, This tutorial can be used for SCD Type1 and will post another video for SCD Type 2

  • @pritamsuryavanshi2582
    @pritamsuryavanshi2582 3 ปีที่แล้ว +2

    Could you make a video on "How to implement SCD 2 using PySpark/Spark SQL in Databricks" ? Thanks.

  • @perryliu6573
    @perryliu6573 2 ปีที่แล้ว +1

    How do we manage if the one of rows in the source table got deleted and we also want to delete this row in the target table?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว +1

      There is an option for whenmatcheddelete as well. But that's is for matching cases. In your case if source df contains latest snapshot, then better you can go for truncate and load

  • @suhassajjan4056
    @suhassajjan4056 ปีที่แล้ว +1

    In my case table is in Hive ,can I implement same solution

  • @leviettung2000
    @leviettung2000 2 ปีที่แล้ว +1

    Hi Raja,
    i am also doing upsert with structure streaming into Azure SQL database. Everything is not as it should be. I can upload via connect ODBC on normal connection but not in writeStream. Error that ODBC is not installed (but I do). I upsert with forEach.
    Can you give me some advice, many thanks

  • @yourtuber6367
    @yourtuber6367 11 หลายเดือนก่อน

    Please upload delta live table series

  • @sivaani37
    @sivaani37 ปีที่แล้ว +1

    Can we directly update a table in SQL server instead of delta table ?

  • @keerthanavijayakumar9754
    @keerthanavijayakumar9754 11 หลายเดือนก่อน +1

    Hi sir,
    Is that possible through pyspark standalone. Pls explain

    • @rajasdataengineering7585
      @rajasdataengineering7585  11 หลายเดือนก่อน +1

      Hi Keerthana, yes it is possible only through pyspark also

  • @krishnamurthy9720
    @krishnamurthy9720 3 ปีที่แล้ว

    How do we write that number of inserted records count into some audit table

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 ปีที่แล้ว +3

      Hi Krishna,
      Very good question.
      In my example, delta table is located at /FileStore/tables/delta_merge.
      So after performing merge operation on this delta table, you can follow below steps
      from delta.tables import *
      delta_df = DeltaTable.forPath(spark, "/FileStore/tables/delta_merge")
      lastOperationDF = delta_df.history(1) # get the last operation
      display(lastOperationDF)
      explode_df = lastOperationDF.select(lastOperationDF.operation,explode(lastOperationDF.operationMetrics))
      display(explode_df)
      The column operationMetrics would contains all metrics including number of records inserted and number of records updated etc.,
      explode_df can be used to retrieve these metrics.
      Hope it helps

    • @krishnamurthy9720
      @krishnamurthy9720 3 ปีที่แล้ว

      @@rajasdataengineering7585 thank you for instant reply..

    • @rajasdataengineering7585
      @rajasdataengineering7585  3 ปีที่แล้ว

      You are welcome

  • @DevelopingI
    @DevelopingI ปีที่แล้ว

    Hey Thank you for the video. I am using the Method 1 to Perform Merge on a big table (1TB). It takes 3+ hours to do that.
    Can you please suggest how can I improve that?
    Also is it possible and advised to perform Merges on Parquet rather than converting these to Delta?

  • @kunalmishra348
    @kunalmishra348 ปีที่แล้ว +1

    Hello
    Can you please tell how to change the data type of columns of the created delta table .
    For ex : In this video you have created

  • @ashishsharan6503
    @ashishsharan6503 3 ปีที่แล้ว +1

    From where I can get the scripts you have shown in the tutorials, I liked them very much

  • @gyan_chakra
    @gyan_chakra 2 ปีที่แล้ว +1

    Which join is equivalent to merge ?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว +2

      Merge is upsert operation, not joining operation. However internally it is equivalent of outer join

    • @gyan_chakra
      @gyan_chakra 2 ปีที่แล้ว +1

      @@rajasdataengineering7585 thank you so much 😊👌

  • @yogeshgavali5238
    @yogeshgavali5238 9 หลายเดือนก่อน

    How can we delete the data which is not in source in same merge statement for pyspark?

  • @sanjaynath7206
    @sanjaynath7206 2 ปีที่แล้ว +1

    SCD Type 2 video has been removed or made private? Could you please make it public? Awesome videos!

  • @keerthanavijayakumar9754
    @keerthanavijayakumar9754 11 หลายเดือนก่อน +1

    @rajasdataengineering7585 hi sir,
    I have data in RDBM sql(source)
    I do some transformation and write that data in postgres db using pyspark. As this job is triggered on an hourly basis and fetching the data form source in 8 hour interval, there are so many duplicates in postgres table how to overcome that. Plsss explain me. Pls

    • @rajasdataengineering7585
      @rajasdataengineering7585  11 หลายเดือนก่อน +1

      Hi Keerthana, better create a view at postgres side with logic of handling duplicates and ingest data from view to databricks

    • @keerthanavijayakumar9754
      @keerthanavijayakumar9754 11 หลายเดือนก่อน

      @@rajasdataengineering7585 I need to move that data to warehouse. Transformed data can only be written but existing data and transformed data having duplication. How to write without again entring the same data in postgres

  • @midhunssivan139
    @midhunssivan139 ปีที่แล้ว +1

    Can you please explain pyspark to Oracle table update.Insert i am able to do.

    • @rajasdataengineering7585
      @rajasdataengineering7585  ปีที่แล้ว

      We need to use jdbc driver for Oracle database as well. The process is same as ms SQL table in this example

    • @midhunssivan139
      @midhunssivan139 ปีที่แล้ว

      I tried using jdbc.It is inserting.But update statement not supported,either it will overwrite the full data.Is there anyway to execute merge statement on rdbms using pyspark

  • @krishnamurthy9720
    @krishnamurthy9720 2 ปีที่แล้ว +1

    I am getting error when inserting specific columns instead of all columns. Saying column is missing in insert

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว

      For update, you can give specific columns but for insert complete list of columns to be provided

  • @dataanalyst3210
    @dataanalyst3210 6 หลายเดือนก่อน +1

    please add one project production ready which live

  • @sravankumar1767
    @sravankumar1767 2 ปีที่แล้ว +1

    How to Merge Spark DataFrame - Complex type if we have two json files json 1 schema and json2 schema is differenr how can we merge using pyspark. can you please explain this scenario.

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว +1

      When you say merge, I assume you mean union of 2 dataframes from json files.
      Pls let me know if you mean SQL merge operation, not union.
      For union, number of columns and datatype should match. So you need to alter the dataframe first to meet these 2 conditions and it can be combined

    • @sravankumar1767
      @sravankumar1767 2 ปีที่แล้ว +1

      @@rajasdataengineering7585 yes we can use union but how can we handle complex json when the schema is different then finally we can use UNION. Can you please explain this scenario

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว +1

      First, we need to flatten the nested json fields and remove unwanted columns and make the schema same for both dataframe.
      Now we can apply union.
      If you have any sample dataset, you can share it to my mail box. I can help you

    • @sravankumar1767
      @sravankumar1767 2 ปีที่แล้ว

      @@rajasdataengineering7585 can you please share your mail id

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว +1

      audaciousazure@gmail.com

  • @rohitkumar-nk6sd
    @rohitkumar-nk6sd 2 ปีที่แล้ว +1

    Hi how to merge on 2 columns?

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว

      I have posted another video for SCD type 2 and explained merging multiple columns. Kindly refer that video

    • @rohitkumar-nk6sd
      @rohitkumar-nk6sd 2 ปีที่แล้ว +1

      @@rajasdataengineering7585
      Hi sir
      In my case
      I need to merge on 2 columns with 2 merge keys
      Please help me out 🙏

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว

      Please refer this video th-cam.com/video/GhBlup-8JbE/w-d-xo.html

  • @weldervicentemoreiramartin9467
    @weldervicentemoreiramartin9467 2 ปีที่แล้ว

    04:40

  • @shalinikumari-qx9tn
    @shalinikumari-qx9tn 3 ปีที่แล้ว

    Please make a video on delta lake

  • @mohanishsingh9786
    @mohanishsingh9786 2 ปีที่แล้ว

    Sir can you share the notebook please

  • @tejadeep-u2n
    @tejadeep-u2n ปีที่แล้ว

    can you provide total script we can usefully to practice

  • @AK-ff7xt
    @AK-ff7xt ปีที่แล้ว

    Hi,
    Need this notebook. Can you please share

  • @DebayanKar7
    @DebayanKar7 ปีที่แล้ว

    Please add the html version of your notebook

  • @weldervicentemoreiramartin9467
    @weldervicentemoreiramartin9467 2 ปีที่แล้ว +1

    Hello, I copied the template from the databricks documentation and saw that there are some differences in your example. Why doesn't the documentation model work?
    from delta.tables import *
    deltaTableVendas = DeltaTable.forPath(spark, 'dbfs:/mnt/bronze/vendas/')
    deltaTableVendasUpdates = DeltaTable.forPath(spark, 'dbfs:/mnt/silver/vendas/')
    dfUpdates = deltaTableVendasUpdates.toDF()
    deltaTableVendas.alias('vendas') \
    .merge(
    dfUpdates.alias('updates'),
    'vendas.numero_transacao = updates.numero_transacao'
    ) \
    .whenMatchedUpdate(set =
    {
    "numero_transacao": "updates.numero_transacao",
    "numped": "updates.numped",
    "codcli": "updates.codcli",
    "codprod": "updates.codprod",
    "data_venda": "updates.data_venda",
    "quantidade": "updates.quantidade",
    "valor": "updates.valor"

    }
    ) \
    .whenNotMatchedInsert(values =
    {
    "numero_transacao": "updates.numero_transacao",
    "numped": "updates.numped",
    "codcli": "updates.codcli",
    "codprod": "updates.codprod",
    "data_venda": "updates.data_venda",
    "quantidade": "updates.quantidade",
    "valor": "updates.valor"
    }
    ) \
    .execute()

    • @rajasdataengineering7585
      @rajasdataengineering7585  2 ปีที่แล้ว +1

      Ideally this code should work as well. Need to look into details if any specific error. Could you elaborate more if you get any error

  • @nagamanickam6604
    @nagamanickam6604 9 หลายเดือนก่อน +1

    Thank you

  • @shalinikumari-qx9tn
    @shalinikumari-qx9tn 3 ปีที่แล้ว

    Please make a video on delta lake