While I was searching if DLT had merge capabilities, I found the documentation for this feature update but got confused since it was deliberately labeled as for CDC. I initially thought it was purely for capturing change transactions. I never realized it was just a modified MERGE statement. I'll test this right away. Thanks for making sense of the documentation for me! Love the Databricks videos.
Simon, I have a question: While SCD Type 1 can achieve similar outcomes, what are the advantages of using the apply_changes function? Does it offer better performance? Additionally, when implementing SCD Type 1 in the silver layer through a materialized view, does the process involve scanning all historical files to identify the latest record? Conversely, when using apply_changes, does it optimize the process by comparing only the latest data in the silver materialized view with the incoming new data?
Simon - This is indeed a great presentation. Could you point me to the notebooks which you referenced here. It will be good to have a glimpse of them. Thanks for the sessions.
One MAJOR issue i'm having is applying changes into an existing table. I have a "full" file that I load in as a table and then a change data stream writing to another DLT table. I cannot apply changes into the full file table because it is already defined in the pipeline.
I am in a bit of a pinch right now. So based from my understanding in this video, is this new CDC solely for capturing the changes in the source database? RIght now I am re-building a pipeline with DLT and by the logic of the previous version I have a master table that contains all the previous information and then I have another table with the new_daily data that is going to be used to update the master table. Both of them exists as a dlt.table in the pipeline and I am trying to merge them right now, to not much success. Is this approach incorrect? Or is there a way to merge the two dlt.tables with the above mentioned CDC? any help is appreciated!
I have hit a bit of a conundrum with this feature and wonder if you have an opinion. After running apply_changes() on a table to get the latest version of records that can be frequently updated, This table subsequently becomes a source. I want to add surrogate keys for dimensions that otherwise come across as plain text. When doing this join using a streaming read against the newly created source, databricks will complain that source data has changed (and it has), with a subsequent warning that this error will keep occurring until the read_stream is changed to read. This is of course very inefficient because it forces a complete rewrite of the target table with each execution and is not viable for a warehouse environment. To get around this I tried writing the surrogate keys to a table that is upstream of the apply_changes() stage, but the performance of the operation is absolutely terrible despite processing only a few hundred records which suggests that the joins are very inefficient or that it is perhaps processing much more than the few hundred records it should be limited to. The only upside here is that it does allow the apply_changes() stage to become my gold table. Have you encountered this and if so, how do you handle it? Kind regards...
Great video. Could you also share video of how can I create multiple schemas for multiple tables in one notebook and use that schema in another notebook.
They've changed the function recently, you use create_streaming_table() - see the docs here: docs.databricks.com/en/delta-live-tables/python-ref.html#create-target-fn
The fact you can't use some kind mapping and require both sides to have the same columns in lieu of mapping and calculated values really is a major shortcoming. PK Hash and Change Columns Hash for example. Something Like KEYS(Source-Expression = Target-Expression, ... Source-Expression-n = Target-Expression-n) and similar in the WHERE... So now I have to add those columns to the Bronze / Raw (yuck) along with other taxonomical transforms, in the Bronze / Raw or else introduce another 'zone' perhaps?
Hi. When we do it without DLT. You need to have a target table with at least the same primary key(s). to be compared against. I also use a command to enable automerge and I can create a table with only one column and the new columns will be added when the merge happens. without this command/set it wont have new tables added automatically. Instead you could create your target table with alll columns already. so you don't have this problem. the DLT version is a lot easier for sure. But the manual approach also works well
One thing I’ve been looking at for a couple of weeks is how this would suit multiple structured streaming tables, since one notebook can only use one spark pool. Does this in practise mean one spark pool for every streaming table or is there a workaround with dlt in some way?
Hey! So you can define several streaming tables within the same notebook, and you can also attach many notebooks to a single DLT pipeline and it'll pull everything together on the same spark cluster. It's more a case of deciding how many streams you want running together and when you want to separate across clusters for availability/protection
@@AdvancingAnalytics Thank you so much for the reply! I didn't see it plausible to have 20+ ETL and streaming upserts defined in the same notebook - but if it's possible to attach many notebooks with streaming data (and upserts) to a single DLT pipeline (and it somehow manages to stream all at the same time), then this is definitely what will be next on my agenda!
Actually, it seems that the concepts easliy are mixed: incremental and streaming. So reading from a Kafka CDC, incremental live tables would with a DLT pipeline actually just apply the changes as fast as a streaming live table? Confused about the concepts.
@@jandrees8190 you can supply the argument stored_as_scd_type = "2" to apply_changes to get this functionality. There are also more hidden functionality in "/databricks/spark/python/dlt/api.py", which you can extract from a dlt flow without much difficulty. Note: its probably pre-beta and isn't perfect (only has __START_AT and __END_AT columns).
love your videos. Right now I'm struggling with APPLY CHANGES in both SQL and Python for a very simple scd1 update....I get the error "detected a update....in source table at version 2.This is currently not supported" I see other people have come across the same error but i have not seen a cause or a solution for it. Any suggestions would be really really helpful. Thanks
So with Delta Streaming, by default you can only stream from a table where you have appended records, not updated. That's to avoid re-sending the records in the same parquet file as something that changed. With standard spark you can force it with the "IgnoreChanges" option - I'm guessing that hasn't been implemented into DLT yet, but try adding that option to the source data frame and see if it helps!
@@AdvancingAnalytics Thanks for the quick reply. I think I'm going to have to rethink my etl since my 1 landing file contains multiple entities. But you have certainly made things easier! Thanks agin.
@@AdvancingAnalytics I independently found this option and tried it, but it did not fix the problem. The source appends new files only so something else must be happening at the source making Delta Streaming think there's a change when there isn't
@@AdvancingAnalytics Thanks for clarifying this. So updates do not work with Apply Changes option and it is not really a typical MERGE scenario. Is my understanding correct?
@@AdvancingAnalytics I got same errors when a new file arrive and there is duplicates in the new files. Is taht normal? the error says ignore to true or find a directory... Do you have contact ? I am working on using databricks thanks But you could get around this by refresh all which is not recommended lol
DLT is getting better and better. Thanks for sharing.
While I was searching if DLT had merge capabilities, I found the documentation for this feature update but got confused since it was deliberately labeled as for CDC. I initially thought it was purely for capturing change transactions. I never realized it was just a modified MERGE statement. I'll test this right away.
Thanks for making sense of the documentation for me! Love the Databricks videos.
Simon, I have a question: While SCD Type 1 can achieve similar outcomes, what are the advantages of using the apply_changes function? Does it offer better performance? Additionally, when implementing SCD Type 1 in the silver layer through a materialized view, does the process involve scanning all historical files to identify the latest record? Conversely, when using apply_changes, does it optimize the process by comparing only the latest data in the silver materialized view with the incoming new data?
Simon - This is indeed a great presentation. Could you point me to the notebooks which you referenced here. It will be good to have a glimpse of them. Thanks for the sessions.
One MAJOR issue i'm having is applying changes into an existing table. I have a "full" file that I load in as a table and then a change data stream writing to another DLT table. I cannot apply changes into the full file table because it is already defined in the pipeline.
Is there a way to use “apply changes into” that inserts records only?
I would like to ignore updates.
Quick Question : If a record is dropped from Source table i.e hard delete how does apply_changes handle it .
I am in a bit of a pinch right now. So based from my understanding in this video, is this new CDC solely for capturing the changes in the source database? RIght now I am re-building a pipeline with DLT and by the logic of the previous version I have a master table that contains all the previous information and then I have another table with the new_daily data that is going to be used to update the master table. Both of them exists as a dlt.table in the pipeline and I am trying to merge them right now, to not much success. Is this approach incorrect? Or is there a way to merge the two dlt.tables with the above mentioned CDC? any help is appreciated!
I have hit a bit of a conundrum with this feature and wonder if you have an opinion. After running apply_changes() on a table to get the latest version of records that can be frequently updated, This table subsequently becomes a source. I want to add surrogate keys for dimensions that otherwise come across as plain text. When doing this join using a streaming read against the newly created source, databricks will complain that source data has changed (and it has), with a subsequent warning that this error will keep occurring until the read_stream is changed to read. This is of course very inefficient because it forces a complete rewrite of the target table with each execution and is not viable for a warehouse environment.
To get around this I tried writing the surrogate keys to a table that is upstream of the apply_changes() stage, but the performance of the operation is absolutely terrible despite processing only a few hundred records which suggests that the joins are very inefficient or that it is perhaps processing much more than the few hundred records it should be limited to. The only upside here is that it does allow the apply_changes() stage to become my gold table.
Have you encountered this and if so, how do you handle it? Kind regards...
Hi Simone
got this error, can you please explain "module 'dlt' has no attribute 'create_view"
Thanks. Great video!
Great video.
Could you also share video of how can I create multiple schemas for multiple tables in one notebook and use that schema in another notebook.
you mentioned "Blank Table" creation for merging into the target table that doesn't exist yet. How are you doing this? I cannot find examples.
They've changed the function recently, you use create_streaming_table() - see the docs here: docs.databricks.com/en/delta-live-tables/python-ref.html#create-target-fn
You are great, thank you so much!@@AdvancingAnalytics
The fact you can't use some kind mapping and require both sides to have the same columns in lieu of mapping and calculated values really is a major shortcoming. PK Hash and Change Columns Hash for example. Something Like KEYS(Source-Expression = Target-Expression, ... Source-Expression-n = Target-Expression-n) and similar in the WHERE...
So now I have to add those columns to the Bronze / Raw (yuck) along with other taxonomical transforms, in the Bronze / Raw or else introduce another 'zone' perhaps?
Hi Simone!
Great video again! Thanks a lot!
Can you share how these “create empty tables” scripts looked before DLT?
Hi. When we do it without DLT.
You need to have a target table with at least the same primary key(s). to be compared against.
I also use a command to enable automerge and I can create a table with only one column and the new columns will be added when the merge happens.
without this command/set it wont have new tables added automatically. Instead you could create your target table with alll columns already. so you don't have this problem.
the DLT version is a lot easier for sure.
But the manual approach also works well
One thing I’ve been looking at for a couple of weeks is how this would suit multiple structured streaming tables, since one notebook can only use one spark pool. Does this in practise mean one spark pool for every streaming table or is there a workaround with dlt in some way?
Hey! So you can define several streaming tables within the same notebook, and you can also attach many notebooks to a single DLT pipeline and it'll pull everything together on the same spark cluster. It's more a case of deciding how many streams you want running together and when you want to separate across clusters for availability/protection
@@AdvancingAnalytics
Thank you so much for the reply!
I didn't see it plausible to have 20+ ETL and streaming upserts defined in the same notebook - but if it's possible to attach many notebooks with streaming data (and upserts) to a single DLT pipeline (and it somehow manages to stream all at the same time), then this is definitely what will be next on my agenda!
Actually, it seems that the concepts easliy are mixed: incremental and streaming.
So reading from a Kafka CDC, incremental live tables would with a DLT pipeline actually just apply the changes as fast as a streaming live table? Confused about the concepts.
Does this work with schema evolution? For example on a matched key will it work if the source data has struct fields that the target does not?
How can I get dropped records in Delta Live Tables
Thanks for sharing!
Any easy way to integrate DLT Pipelines with CI/CD?
It will be interesting to play with it and see how can we build SCD2 table rather than updating existing record.
Hi, we are currently looking at the same issue. Did you find a way?
@@jandrees8190 you can supply the argument stored_as_scd_type = "2" to apply_changes to get this functionality. There are also more hidden functionality in "/databricks/spark/python/dlt/api.py", which you can extract from a dlt flow without much difficulty. Note: its probably pre-beta and isn't perfect (only has __START_AT and __END_AT columns).
Seriously, where do I sign up for your Patreon?
love your videos. Right now I'm struggling with APPLY CHANGES in both SQL and Python for a very simple scd1 update....I get the error "detected a update....in source table at version 2.This is currently not supported" I see other people have come across the same error but i have not seen a cause or a solution for it. Any suggestions would be really really helpful. Thanks
So with Delta Streaming, by default you can only stream from a table where you have appended records, not updated. That's to avoid re-sending the records in the same parquet file as something that changed. With standard spark you can force it with the "IgnoreChanges" option - I'm guessing that hasn't been implemented into DLT yet, but try adding that option to the source data frame and see if it helps!
@@AdvancingAnalytics Thanks for the quick reply. I think I'm going to have to rethink my etl since my 1 landing file contains multiple entities. But you have certainly made things easier! Thanks agin.
@@AdvancingAnalytics I independently found this option and tried it, but it did not fix the problem. The source appends new files only so something else must be happening at the source making Delta Streaming think there's a change when there isn't
@@AdvancingAnalytics Thanks for clarifying this. So updates do not work with Apply Changes option and it is not really a typical MERGE scenario. Is my understanding correct?
@@AdvancingAnalytics I got same errors when a new file arrive and there is duplicates in the new files. Is taht normal? the error says ignore to true or find a directory... Do you have contact ? I am working on using databricks thanks But you could get around this by refresh all which is not recommended lol
wonderful article, thank you