24 Fix Skewness and Spillage with Salting in Spark | Salting Technique | How to identify Skewness

แชร์
ฝัง
  • เผยแพร่เมื่อ 3 ธ.ค. 2024

ความคิดเห็น • 39

  • @adulterrier
    @adulterrier หลายเดือนก่อน

    As always, brilliant and best on the tube. Small suggestion: we don't need a UDF (everybody hates UDFs), we can simply use the pyspark.sql.functions rand and floor:
    salted_emp = emp.withColumn("salted_dept_id", concat("department_id", lit("_"), floor(rand()*16)))

    • @easewithdata
      @easewithdata  24 วันที่ผ่านมา

      Yes you can definitely use that. I just wanted a simple demonstration for salting technique.

  • @at-cv9ky
    @at-cv9ky 9 หลายเดือนก่อน

    V nice explanation.

  • @dataworksstudio
    @dataworksstudio 10 หลายเดือนก่อน

    Great video. Thank you bhaiya.

  • @pramod3469
    @pramod3469 3 หลายเดือนก่อน +1

    very well explained...thanks...in this example we have smaller dept table hence cross joining not impacting the number of records what if we have huge size of dept table also (both our tables are huge) second does salting works on string columns as well (if joining key is string)

    • @easewithdata
      @easewithdata  2 หลายเดือนก่อน

      In DW concepts DIM tables are basically smaller that Fact tables. But that's a trade-off you have handle if you want to go for manual salting. And yes STRING columns can be used for salting.
      If you like my content, Please make sure to share with your network over LinkedIn 👍

    • @adulterrier
      @adulterrier หลายเดือนก่อน

      @@easewithdata if the DIM table is that small, why not broadcast it? Or it wouldn't then solve the skewness?
      Isn't aggregate a better use case for salting?

  • @SivakumarGalaxy-r5g
    @SivakumarGalaxy-r5g 8 หลายเดือนก่อน

    Mass solutions bruhh🎉

  • @sumeet6510
    @sumeet6510 7 หลายเดือนก่อน

    Great Video👍

  • @ankitpehlajani9244
    @ankitpehlajani9244 10 หลายเดือนก่อน

    This is what exactly I was looking for , such a great way to explain.Can you please explain why did you choose 16 as a number of partitions ?

    • @easewithdata
      @easewithdata  10 หลายเดือนก่อน +1

      The number partitions need to factor of cores. subhamkharwal.medium.com/pyspark-the-factor-of-cores-e884b2d5af6c

  • @akshaykadam1260
    @akshaykadam1260 5 หลายเดือนก่อน

    great work

    • @easewithdata
      @easewithdata  5 หลายเดือนก่อน

      Thank you for your feedback 💓 Please make sure to share it with your network over LinkedIn 👍

  • @vishaljoshi1752
    @vishaljoshi1752 9 หลายเดือนก่อน +1

    thank you for explanation one question is there at 4:41 that 39.6 MB is data first loaded in memory then it deserialized then it become 79mb or it is more than that means( deserialized data - 79 MB ) is able to process at one time then after processing it that 79 MB will be processed

    • @easewithdata
      @easewithdata  7 หลายเดือนก่อน +1

      Spillage shows the amount of data it is not able to fit in memory for processing, both in deserialized and serialized form. This data will get processed as soon Spark is able to fit the same after processing some data.

    • @vishaljoshi1752
      @vishaljoshi1752 7 หลายเดือนก่อน

      @@easewithdata thanks...like in sort merge join suppose we are joining two tables let suppose after shuffling the shuffled partition of one table have very big size so that it can not fully fit in memory.. I have seen in such a scenario spark is able to join the data with the help of sort merge join how it is possible as we know both table partitions have to be fit in memory for join

  • @Amarjeet-fb3lk
    @Amarjeet-fb3lk 5 หลายเดือนก่อน +1

    Why you made 32 shuffle partition if you have 8core,
    If one partition is going to process on single core, from where it will get other remaining 24 cores?

    • @easewithdata
      @easewithdata  5 หลายเดือนก่อน +1

      The 8 cores will process all the 32 partitions in 4 iterations each. (8X4 = 32)

  • @sanjeevreddy3691
    @sanjeevreddy3691 หลายเดือนก่อน

    1. do we need to keep -> random numbers and no of shuffle partitions same ? how did you choose 16 ?

    • @easewithdata
      @easewithdata  20 วันที่ผ่านมา

      Just make sure to select a number which is multiple of cores and doesn't lead to small file issue. Selecting a very high number can cause that.
      If you like my content, please make sure to share this with you network over LinkedIn 💓

  • @maheshkumarkukkudapu2108
    @maheshkumarkukkudapu2108 2 หลายเดือนก่อน

    Hi Sir,
    emp_records_skewed.csv file is missing in the GIT. Could you please upload the file.

    • @easewithdata
      @easewithdata  2 หลายเดือนก่อน

      Unfortunately the file is too big to upload in Git. You can create the skewed file by merging same type of data multiple times to the same file.

  • @Fullon2
    @Fullon2 11 หลายเดือนก่อน

    Amazing video.
    Why not use repartition using id_departament? Isn't it simpler?

    • @easewithdata
      @easewithdata  11 หลายเดือนก่อน +1

      If we repartition shuffle partitions after joining, then its again an extra step of exchange. And if we try to repartition before shuffle then the data will again get skewed in shuffle partitions.
      So, its a choice of optimisation completely based on scenario.

    • @easewithdata
      @easewithdata  11 หลายเดือนก่อน

      Watch out for AQE video for the most simpler option to fix Skewness.

    • @Fullon2
      @Fullon2 11 หลายเดือนก่อน

      @@easewithdata thanks bro, I'm going to watch.

  • @montypoddar10
    @montypoddar10 8 หลายเดือนก่อน

    I couldn't find the emp_skewed_data in the repo, can you please share it here in the link or on the git repo.

    • @easewithdata
      @easewithdata  8 หลายเดือนก่อน

      The dataset is too huge to be uploaded in Github. I am trying an external source to upload the same.

  • @Rafian1924
    @Rafian1924 3 หลายเดือนก่อน

    Awesome sir.. why don't you create courses on udemy?? You have a great ability to explain technical aspects ❤❤

    • @easewithdata
      @easewithdata  3 หลายเดือนก่อน +3

      Thank you ❤️ I want all good content to be available for free.
      If you like my content, just make sure to share with your network over LinkedIn 👍

    • @Rafian1924
      @Rafian1924 3 หลายเดือนก่อน

      @@easewithdata definitely sir😊🙏

  • @VikasChavan-v1c
    @VikasChavan-v1c 7 หลายเดือนก่อน

    if we can set the shuffle partition to 32 and don't use the salting technique, just do the joining on original department_id columns then what will happen

    • @easewithdata
      @easewithdata  7 หลายเดือนก่อน

      Shuffle setting doesn't guarantee even data distribution among executors. In order to make sure the data is distributed properly, we are using slating technique.

  • @Rakesh-q7m8r
    @Rakesh-q7m8r 8 หลายเดือนก่อน

    Hi Shubham,
    The skewed employee dataset is not available, could you please push it to the git.

    • @easewithdata
      @easewithdata  8 หลายเดือนก่อน +1

      Hello,
      Unfortunately GitHub is not allowing me to push GB files anymore. I am trying to locate another file sharing service to upload the bigger files.

    • @pawansharma-pz1hz
      @pawansharma-pz1hz 8 หลายเดือนก่อน +1

      ​@@easewithdata Hi Please create file at your end using below code
      # Read Employee data
      _schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
      emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv")
      from pyspark.sql.functions import lit,count
      emp.groupBy("department_id").agg(count("first_name").alias("count")).show()
      dept_3 = spark.range(40).select(lit(3).alias("department_id_temp"))
      dept_7 = spark.range(40).select(lit(7).alias("department_id_temp"))
      emp_inc_3 = emp.join(dept_3, emp["department_id"] == dept_3["department_id_temp"], "left_outer")
      emp_inc_3 = emp_inc_3.drop("department_id_temp")
      emp_inc_3.groupBy("department_id").agg(count("first_name").alias("count")).show()
      emp_inc_7 = emp_inc_3.join(dept_7, emp_inc_3["department_id"] == dept_7["department_id_temp"], "left_outer")
      emp_inc_7 = emp_inc_7.drop("department_id_temp")
      emp_inc_7.groupBy("department_id").agg(count("first_name").alias("count")).show()
      emp_inc_7.repartition(1).write.format("csv").mode("overwrite").option("header", True).save("data/output/emp_record_skewed.csv")

    • @anjibabumakkena
      @anjibabumakkena 8 หลายเดือนก่อน

      Hi, how can we getvthe data to practice as same​@@easewithdata

  • @natarajbeelagi569
    @natarajbeelagi569 3 หลายเดือนก่อน

    can you pls provide data

    • @easewithdata
      @easewithdata  3 หลายเดือนก่อน

      All datasets are uploaded at: github.com/subhamkharwal/pyspark-zero-to-hero/tree/master/datasets