Hi Afaque, great video, and content :). Maybe it may be worth noting in the video the limitations of broadcast joins: the broadcasted dataset needs to fit in the driver & executor memory, and if you have many executors, it may take longer than shuffle merge, it could in fact timeout.
Hey @vamsikrishnabhadragiri402, I'm referring to partitioning by `Customers.id`. Basically doing a `.partitionBy("id")`. If you were to partition by `Customers.id`, there could be data skews because some customers can have more transactions than others. So, some `Customers.id` partition files will have several rows, while others will have negligible.
@Afaque -Thank you so much for detailed explanation. need help for below: I am running the job in local but while checking Dag , job is getting completed and Dag is not visible and due to this not able to visualize anything in Dag. could you please let me know how we can keep running the Job so that we can visualize the Dag easily
Really great content, all of your videos. Thannk you!! Just had a question out of curiousity - Does AQE only coalesce shuffle partitions or depending on the need, increase the shuffle partitions beyond 200?
Hey @anandchandrashekhar2933, appreciate the kind words. Yes, AQE can do both - increase (split) and decrease (coalesce) the number of shuffle partitions. A clear example is this one is in the Spark DAGs video where 1 skewed partition was split into 12 because that 1 partition was skewed. Refer here: th-cam.com/video/O_45zAz1OGk/w-d-xo.html
@@afaqueahmad7117 Ah thank you for that. That really made it very clear. For some reason, i couldnt replicate the same when i ran your notebook on Databricks, even though i disabled broadcast hash join, it still ended up using broadcast instead of the AQE coalesce followed by sort merge. Maybe seems like something specific about the spark version i am currently on. But thats all right. Thank you again :)
@@afaqueahmad7117 Thank you! That makes sense. For some reason, I couldnt replicate it when running your notebook on Databricks, even if i disable broadcash hash join, it still ended up using it, instead of how you showed it, that is a AQE coalesce followed by a sort merge join. Maybe something specific with the spark version that i was on. But that's all right. Thank you again!!
hi sir, great content as always. just a question on the last part of the video: if i correctly understood you said to repartition(3) the big table so that rows are evenly repartitioned across the 3 executors and then apply the broadcast join. But in the code part you only performed a broadcast join without repartition(3). Why that? I am a little bit confused about that part. thanks a lot
Hey @retenim28, thank you, appreciate it. On the question - you're correct that I mentioned doing a `repartition(3)` when the table is big so that the rows get evenly partitioned. Reason why I don't do a `repartition(3)` in the code is because sample transactions table I'm using (supposedly the bigger table) isn't very big - hence a repartitioning to even out data is not needed. Hope that clarifies :)
@@afaqueahmad7117 this clatifies a lot, thank you. Another question: `repartition(3)` function involves a shuffle, so theoretically it would be better avoiding that and only use broadcast join, as you did in the video. So, it seems to me there are two possible situation: 1. make `repartition(3)` and then broadcast join: this involves a shuffle (bad) of big table, but finally skew data problem is solved so each core will process the same amount of data; 2. avoid `repartition(3)` and then broadcst join: there is no shuffle (good) of big table, but a specific core is forces to work with a huge amount of data compared to the remaining two. Which is the best path? In your code I tried both options and it looks like it's better avoiding `repartition(3)`. Am I missing something on this point? Sorry about the long answer.
Hi Afaque, Loving your videos. Great content. Just one doubt, Isn't AQE automatically enabled from spark3.X , if yes, Why do we explicitly need to set two mentioned property to true ?
Hey @adityeshchaturvedi6553, thanks for the kind words. To answer your question, AQE (spark.sql.adaptive.enabled) defaults to false in Spark 3.0.0 (reference here: spark.apache.org/docs/3.0.0/sql-performance-tuning.html#adaptive-query-execution) while it was enabled (defaulting to true) starting 3.2.0 onwards.
This is helpful, but I still have a few doubts. 1. If Broadcast join is immune to skewness, why there is Salting technique? 2. In the Broadcast join example, the customer dataset appeared to be outside of any executor. Where is it actually stored? How can we specify its storage location?
Hi @snehitvaddi, 1. Broadcast join only works in cases where one dataset is small enough to completely fit inside the executor's memory. For cases, where both DataFrames are large, broadcasting cannot be used and we've to resort to other options like salting 2. The DataFrame to be broadcasted is initially loaded into the RAM of the driver node. Once the broadcast join is triggered, the driver sends a copy to each executor. The executors store the broadcasted DataFrame in it's local in-memory-cache for local computation during the join Hope this clarifies :)
Hey @suruchijha3914, by 15 distinct keys in a join, I'm referring to 15 unique values in the join column. For example: let's say you're joining sales data with product promotions data on the 'product_id' having only 15 unique products / product_id's => means only 15 distinct keys in the join.
just one feedback. at 11:10, your eyes keep darting from your script to your camera. actually since this is a recorded video, it is perfectly fine to keep looking at your script throughout. having your eyes keep changing focus is slightly distracting
Also, have you actually seen real-life performance improvements from AQE in a pipeline? - I always end up setting it to false to avoid unpredictable behavior
Well, some of it's functionalities work pretty well, like converting sort merge join to broadcast join, coalescing the number of partitions, but I do agree, others like optimizing skewed joins are hard to predict and understand.
Incredible series, thank you Afaque Ahmad. Looking forward to the next videos.😃
Many thanks @Fullon2 for the kind words, really appreciate it! :)
Hi Afaque, great video, and content :). Maybe it may be worth noting in the video the limitations of broadcast joins: the broadcasted dataset needs to fit in the driver & executor memory, and if you have many executors, it may take longer than shuffle merge, it could in fact timeout.
Thanks @miguelruiz9772, for the kind words, and for the feedback, makes sense! :)
Learnt about AQE today. Thanks for the video.
Great explanation ! Can understand SMJ and BCJ in a better way. Thanks heaps !
amazing you explained in very depth in each video
amazing content!! explained so well !
Hello Afaque, Thanks for the informative video, What does partition by join key means at 18:38?
Hey @vamsikrishnabhadragiri402, I'm referring to partitioning by `Customers.id`. Basically doing a `.partitionBy("id")`. If you were to partition by `Customers.id`, there could be data skews because some customers can have more transactions than others. So, some `Customers.id` partition files will have several rows, while others will have negligible.
Am.not able to see spill details in spark 3.5.2 UI?
@Afaque -Thank you so much for detailed explanation. need help for below:
I am running the job in local but while checking Dag , job is getting completed and Dag is not visible and due to this not able to visualize anything in Dag. could you please let me know how we can keep running the Job so that we can visualize the Dag easily
Hey @Abhish9221, you can pause the program by doing something like a `time.sleep()` at the end of the program and then navigate over to the Spark UI?
@afaqueahmad7117 Thank you so much 🙏
amazing job bro
Really great content, all of your videos. Thannk you!! Just had a question out of curiousity - Does AQE only coalesce shuffle partitions or depending on the need, increase the shuffle partitions beyond 200?
Hey @anandchandrashekhar2933, appreciate the kind words. Yes, AQE can do both - increase (split) and decrease (coalesce) the number of shuffle partitions. A clear example is this one is in the Spark DAGs video where 1 skewed partition was split into 12 because that 1 partition was skewed. Refer here: th-cam.com/video/O_45zAz1OGk/w-d-xo.html
@@afaqueahmad7117 Ah thank you for that. That really made it very clear. For some reason, i couldnt replicate the same when i ran your notebook on Databricks, even though i disabled broadcast hash join, it still ended up using broadcast instead of the AQE coalesce followed by sort merge. Maybe seems like something specific about the spark version i am currently on. But thats all right. Thank you again :)
@@afaqueahmad7117 Thank you! That makes sense. For some reason, I couldnt replicate it when running your notebook on Databricks, even if i disable broadcash hash join, it still ended up using it, instead of how you showed it, that is a AQE coalesce followed by a sort merge join. Maybe something specific with the spark version that i was on. But that's all right. Thank you again!!
Excellent.. Keep going..
hi sir, great content as always. just a question on the last part of the video: if i correctly understood you said to repartition(3) the big table so that rows are evenly repartitioned across the 3 executors and then apply the broadcast join. But in the code part you only performed a broadcast join without repartition(3). Why that? I am a little bit confused about that part. thanks a lot
Hey @retenim28, thank you, appreciate it.
On the question - you're correct that I mentioned doing a `repartition(3)` when the table is big so that the rows get evenly partitioned. Reason why I don't do a `repartition(3)` in the code is because sample transactions table I'm using (supposedly the bigger table) isn't very big - hence a repartitioning to even out data is not needed. Hope that clarifies :)
@@afaqueahmad7117 this clatifies a lot, thank you. Another question: `repartition(3)` function involves a shuffle, so theoretically it would be better avoiding that and only use broadcast join, as you did in the video. So, it seems to me there are two possible situation:
1. make `repartition(3)` and then broadcast join: this involves a shuffle (bad) of big table, but finally skew data problem is solved so each core will process the same amount of data;
2. avoid `repartition(3)` and then broadcst join: there is no shuffle (good) of big table, but a specific core is forces to work with a huge amount of data compared to the remaining two.
Which is the best path?
In your code I tried both options and it looks like it's better avoiding `repartition(3)`. Am I missing something on this point? Sorry about the long answer.
Hi Afaque, Loving your videos. Great content. Just one doubt, Isn't AQE automatically enabled from spark3.X , if yes, Why do we explicitly need to set two mentioned property to true ?
Hey @adityeshchaturvedi6553, thanks for the kind words. To answer your question, AQE (spark.sql.adaptive.enabled) defaults to false in Spark 3.0.0 (reference here: spark.apache.org/docs/3.0.0/sql-performance-tuning.html#adaptive-query-execution) while it was enabled (defaulting to true) starting 3.2.0 onwards.
@@afaqueahmad7117 thanks a lot.
Really appreciate your efforts !
This is helpful, but I still have a few doubts.
1. If Broadcast join is immune to skewness, why there is Salting technique?
2. In the Broadcast join example, the customer dataset appeared to be outside of any executor. Where is it actually stored? How can we specify its storage location?
Broadcast is only possible if the other table is really small to be replicated
Hi @snehitvaddi,
1. Broadcast join only works in cases where one dataset is small enough to completely fit inside the executor's memory. For cases, where both DataFrames are large, broadcasting cannot be used and we've to resort to other options like salting
2. The DataFrame to be broadcasted is initially loaded into the RAM of the driver node. Once the broadcast join is triggered, the driver sends a copy to each executor. The executors store the broadcasted DataFrame in it's local in-memory-cache for local computation during the join
Hope this clarifies :)
Hi @afaque could you please let me know what do you mean by 15 distinct keys in a join
Hey @suruchijha3914, by 15 distinct keys in a join, I'm referring to 15 unique values in the join column. For example: let's say you're joining sales data with product promotions data on the 'product_id' having only 15 unique products / product_id's => means only 15 distinct keys in the join.
Thank you! Afaque Ahmad👍
just one feedback. at 11:10, your eyes keep darting from your script to your camera. actually since this is a recorded video, it is perfectly fine to keep looking at your script throughout. having your eyes keep changing focus is slightly distracting
I love your attention to detail, however, there's no script, its just beginner me, trying to adjust myself to the camera. Appreciate your feedback :)
Also, have you actually seen real-life performance improvements from AQE in a pipeline? - I always end up setting it to false to avoid unpredictable behavior
Well, some of it's functionalities work pretty well, like converting sort merge join to broadcast join, coalescing the number of partitions, but I do agree, others like optimizing skewed joins are hard to predict and understand.