Thank you for this! The flatten heirarchy behavior in the ADF Copy data activity was not working and this is such an elegant alternative solution in Databricks! Just what I needed. I appreciate your sharing it.
excellent piece of information. I have a vice versa scenario, Is there a way to unflatten the data back to original structure? any pointer or reference or sample code would be really helpful
Thanks very much for the tutorial :) , I have a query regarding reading in json files. so i have an array of structs where each struct has a different structure/schema. And based on a certain property value of struct I apply filter to get that nested struct , however when I display using printschema it contains fields that do not belong to that object but are somehow being associated with the object from the schema of other structs , how can i possibly fix this issue ?
Hi Raja sir , I used this function to flatten deeply nested json but it is throwing the error "No such struct field com in ACID...." ...when I went through the schema of the json(nested) , I saw cookies column(which is of struct type) has one field 'com.sx.attr : string (nullable=true)...Please help.
HI Raja, Thank you so much for your videos. Really helpfull. Can you also make a video on CI/CD Integration of notebook and promoting notebooks to higher environments
Sir, I have a situation where I need to define the schema without using the explode, array or flatten function. However, when creating it, it returns the message that it is expecting a string but the schema is an array. Would you help me? root |-- text: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- Id: long (nullable = true) | | |-- subtext: string (nullable = true) | | |-- subtext2: string (nullable = true) | | |-- subtext3: string (nullable = true) | | |-- subtext4: string (nullable = true) schema = StructType( [ StructField("text", ArrayType( StructType( [ StructField("id", IntegerType(), True), StructField("subtext", StringType(), True), StructField("subtext2", StringType(), True), StructField("subtext3", StringType(), True), ]) )) ] ) df1 = df.withColumn("text", from_json( col("text"), schema) )
Hi Raja, Thanks for the solution. I have implemented the same, however getting "Ambiguous references to fields" i.e column names are identical. I have enabled a case-sensitive config, but it is not working.
@tejasnaik6724 It is not repeating, there is a "_" separating it, use this function to remove the prefix. Raja's surprising every day. df1 = flatten(df) column = [i if len(i)
hye raja.. i have doubt.. i run same code in google colab it parsed my json but same code if i run in azure synapse notebook it shows only one null rows.. can u tell me why this happened?
This is not working for null value column while writing in folder .. could you help me ? I mean in the json if one of the column value is null means while writing the flatten json using Df_flatten.write.json(path) and it's not loading the colum which is null . I am trying to fix this issue for long time .
You can compare the schema before writing the data. If columns are missing for null values, you can populate thenm before writing. Lets say you want 5 columns in your json file such as emp_id, name, age, dob, doj. But in your dataframe age is null so its holding only 4 columns such as emp_id, name, dob, doj. In this case, you need to populate the missing column before writing. You can use this workwround: Step 1: Create a UDF to populate missing columns import pyspark.sql.functions as f def add_missing_cols(missing_df): for column in missing_columns: missing_df = rename_df.withColumn(column, lit(None).cast(StringType())) return missing_df Step 2: Compare the actual and expected schema and identify missing columns exp_schema = ["emp_id","name","age","doj","dob"] Actual_schema = df.columns missing_columns = set(exp_schema).difference(Actual_schema) Step 3: Populate the missing columns using UDF in step 1 df_full_schema = add_missing_cols(df) now you can write the dataframe df_full_schema which should contain null columns as well
How can one substitute posexplode_outer instead of esplode_outer in the code. In order to position and column value. I tried to use df=df.selectExpr("*", posexplode_outer(col_name).alias("position",col_name)) but getting error "TypeError: Column is not iterable"
@@rajasdataengineering7585 I resolved it using col(col_name). Here is the code : df=df.select(col("*"),posexplode_outer(col(col_name)).alias(col_name+"_pos",col_name+"_values")).drop(col_name). Thanks Raja for the suggestion.
Hi Raja thanks for this wonderful video , My after flattening the dataframe when i select number of records is duplicated, is there is any way i can solve this ?
Hi Raja, thanks for the solution. How to flatten if the field is of map datatype? Here {Fname, Mname, Lname} are map keys and {abc, pqr, xyz} are map values. Ex : Name {"Fname": "abc", "Mname", "pqr", "Lname": "xyz"}
@@rajasdataengineering7585 Thanks Raja. Using explode, I am able to flattening the fields. I wanted to display them in a separate columns along with other columns in a data frame. Like below. Company Name.Fname Name.Mname Name.LName c1 abc pqr xyz c2 sss qqq ooo
@@rajasdataengineering7585 Thanks Raja for your reply. I have one more question, wondering if you can help. I have two data frames contains name(df1) & address(df2) details respectively. I would like to write the name & address details in the below format to a JSON file. {"Name" : "ABC"} {"Address" : "Ad1"} I used below code to merge both data frames followed by write command to write the merge output to a JSON file val Mergedf_1_2:DataFrame = df1.unionByName(df2,true) Mergedf_1_2.coalesce(1).write.mode("Overwrite").format("json").save("C:/Users/Test.json") But I am getting output without indentation. Appreciate if you can share some inputs. {"Name" : "ABC"} {"Address" : "Ad1"}
@@rajasdataengineering7585 I have a similar requirement where there is need to convert the values of array to columns. I guess you would need a for loop using the withColumn function and drop the exploded column . How do we implement it in code.
Great video and function! Works great, however I am receiving an error when I try and pass in a 2 data frames to be flattened. The first works as expected, but when running the function on the 2nd, it returns an error " 'str' object is not callable" and points to this line.. expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]. If I redefine the function in between each dataframe it works as expected. I think it is something to do with the expanded variable. Any thoughts? Thanks again!
Hi Raja, we are extracting data from Rest API , we have endpoints two nested json files are generated and my manager said convert to csv as well as merge as single csv. Is it possible to merge two different schemas and generate single csv
Hi Sravan, yes it is possible. We need to compare schema and produce null values for missing columns. You can watch this video to compare schemas th-cam.com/video/BtUFleFkXMM/w-d-xo.html
If json file is having complex nested structure (let's say 20+ levels), it won't get parsed properly. This is one case. In another case, while using structured streaming for Kafka integration in databricks, the entire json file would sit as binary data in value column. Without flattening the data, you cant read it
@@rajasdataengineering7585 Sir, if the video is already uploaded please provide the link , im not getting where it is there ,explanation on the function what you have written, it would be really helpful for me and others too , please consider this one sir
@rajasdataengineering7585 Sir, if the video is already uploaded please provide the link , im not getting where it is there ,explanation on the function what you have written, it would be really helpful for me and others too , please consider this one sir
Hi Raja, video was so good and am getting error as. " MongoTypeConversionException: Cannot cast STRING into a IntegerType (value: BsonString{value='42'})" while displaying the data. can you please help me out.
@@rajasdataengineering7585 Thanks so much for the response Raja. Is it possible to customize the script by giving some static field names to avoid the many to many relations ? Or if I want to go with one-to one what's the best way to customize it?
Bcos I'm working in this task....I have tried many ways but nothing helped....I liked this approach but requirement is fully python code....is any other way to achieve this.....in pandas df or something like that
Thank you for this! The flatten heirarchy behavior in the ADF Copy data activity was not working and this is such an elegant alternative solution in Databricks! Just what I needed. I appreciate your sharing it.
Thank you Chris!
Very much educative, thanks for making this video
I'm glad you found it helpful! Keep watching
Thanks!! This is exactly what I needed, such excellent work!!!
Thank you!
excellent piece of information. I have a vice versa scenario, Is there a way to unflatten the data back to original structure? any pointer or reference or sample code would be really helpful
Hey Raja thanks for your superb real time interview series, I cleared interview process for KPMG global services as an azure data engineer
Wow what a great news! Thanks for sharing the good news.
All the best!!
Hi Atharva can we connect? I am also preparing for the interview i need some help.
Let me know where can we connect
Brilliant and awesome tutorial. Thats what we need...
Thank you
Excellent and very neat explanation.Thanks for sharing you wonderful knowledge❤❤
Glad you liked it!
Thank you for this video.. this is wonderful. It's going to help many
Great video man.. that is what I was looking for.
Hi Thankyou for ur valuable teaching.
It would be helpful if you can share the notebooks and other resources.
Thank you so much. This function saves me lots of time.
Glad to know that it helps you!
You are an angel brother..
Thank you
Very useful content. Thank you!
Thanks for your comment!
This is excellent and thank you very much to provide this json parsing solution which will help in many json scenarios. 😍
Glad it was helpful!
Thanks very much for the tutorial :) , I have a query regarding reading in json files.
so i have an array of structs where each struct has a different structure/schema.
And based on a certain property value of struct I apply filter to get that nested struct , however when I display using printschema it contains fields that do not belong to that object but are somehow being associated with the object from the schema of other structs , how can i possibly fix this issue ?
this is so useful..thanks for sharing
Great Videos. Its saved my time. Thank you so much. :)
Thank you Bhanu Teja
Thank you, thank you thank you guy, you saved me!
Glad to know it helped you👍🏻
Hi Raja sir , I used this function to flatten deeply nested json but it is throwing the error "No such struct field com in ACID...." ...when I went through the schema of the json(nested) , I saw cookies column(which is of struct type) has one field 'com.sx.attr : string (nullable=true)...Please help.
HI Raja, Thank you so much for your videos. Really helpfull. Can you also make a video on CI/CD Integration of notebook and promoting notebooks to higher environments
Hi Suresh, sure I will create a video for CI/CD soon
Sir, I have a situation where I need to define the schema without using the explode, array or flatten function.
However, when creating it, it returns the message that it is expecting a string but the schema is an array. Would you help me?
root
|-- text: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Id: long (nullable = true)
| | |-- subtext: string (nullable = true)
| | |-- subtext2: string (nullable = true)
| | |-- subtext3: string (nullable = true)
| | |-- subtext4: string (nullable = true)
schema = StructType(
[
StructField("text", ArrayType(
StructType(
[
StructField("id", IntegerType(), True),
StructField("subtext", StringType(), True),
StructField("subtext2", StringType(), True),
StructField("subtext3", StringType(), True),
])
))
]
)
df1 = df.withColumn("text", from_json( col("text"), schema) )
Usually I don't comment in videos . Awesome work bro.
Thanks bro👍🏻
thank you Raja, you save my day!!
Welcome Alex
Hi Raja, Thanks for the solution. I have implemented the same, however getting "Ambiguous references to fields" i.e column names are identical. I have enabled a case-sensitive config, but it is not working.
@tejasnaik6724 It is not repeating, there is a "_" separating it, use this function to remove the prefix. Raja's surprising every day.
df1 = flatten(df)
column = [i if len(i)
Thanks for sharing
Great Video ! Very informative
Thank you!
Hi Raja, This is wonderful video. I have a quick question can the Json be flattened into multiple tables ?
Hi Sharath, you can create one dataframe first by flattening the json and split that one dataframe into multiple based on your business requirement
Thanks for the video. and sharing the very useful function.
Thanks Vipin. Glad it was helpful!
Wow, thanks for the solution.
Glad it helped
Thank you for the Great video.... 👌
Thank you for watching
what if I create a schema and then try to bind the schema to the json data , provided I know the json data structure already.
Thank you so much! Very useful function 👌
Glad it helped!
hye raja.. i have doubt..
i run same code in google colab it parsed my json but same code if i run in azure synapse notebook it shows only one null rows.. can u tell me why this happened?
This is not working for null value column while writing in folder .. could you help me ? I mean in the json if one of the column value is null means while writing the flatten json using
Df_flatten.write.json(path) and it's not loading the colum which is null . I am trying to fix this issue for long time .
You can compare the schema before writing the data. If columns are missing for null values, you can populate thenm before writing.
Lets say you want 5 columns in your json file such as emp_id, name, age, dob, doj. But in your dataframe age is null so its holding only 4 columns such as emp_id, name, dob, doj. In this case, you need to populate the missing column before writing.
You can use this workwround:
Step 1: Create a UDF to populate missing columns
import pyspark.sql.functions as f
def add_missing_cols(missing_df):
for column in missing_columns:
missing_df = rename_df.withColumn(column, lit(None).cast(StringType()))
return missing_df
Step 2: Compare the actual and expected schema and identify missing columns
exp_schema = ["emp_id","name","age","doj","dob"]
Actual_schema = df.columns
missing_columns = set(exp_schema).difference(Actual_schema)
Step 3: Populate the missing columns using UDF in step 1
df_full_schema = add_missing_cols(df)
now you can write the dataframe df_full_schema which should contain null columns as well
How can one substitute posexplode_outer instead of esplode_outer in the code. In order to position and column value. I tried to use df=df.selectExpr("*", posexplode_outer(col_name).alias("position",col_name)) but getting error "TypeError: Column is not iterable"
The column on which you apply positional explode should be either array or map type. I think the one you applied is neither array nor map field
@@rajasdataengineering7585 I resolved it using col(col_name). Here is the code : df=df.select(col("*"),posexplode_outer(col(col_name)).alias(col_name+"_pos",col_name+"_values")).drop(col_name).
Thanks Raja for the suggestion.
Welcome
Really Great. Is there anything available for Tabular Data to complex JSON File ?
Yes Akhilesh, we can do that as well using pyspark functions. If needed, will post a video
Hi Raja thanks for this wonderful video , My after flattening the dataframe when i select number of records is duplicated, is there is any way i can solve this ?
Thanks Jatin for your comment. I think you can use drop_duplicates or distinct
Great work
How to perform pivot operations on the nested columns in dataframe created from json file???
you can flatten the nested columns first and then pivot
@@rajasdataengineering7585 data is coming through api so I have to dynamically perform the operations
Yes , you can read the data through api in first step. Flatten that out in second step and apply pivot function in third step
@@rajasdataengineering7585 Drop a video on this its' a must problem when it comes to json format
Hi Raja, thanks for the solution. How to flatten if the field is of map datatype? Here {Fname, Mname, Lname} are map keys and {abc, pqr, xyz} are map values.
Ex : Name {"Fname": "abc", "Mname", "pqr", "Lname": "xyz"}
Hi Satya, the function explode can be used to flatten map fields
@@rajasdataengineering7585 Thanks Raja. Using explode, I am able to flattening the fields. I wanted to display them in a separate columns along with other columns in a data frame. Like below.
Company Name.Fname Name.Mname Name.LName
c1 abc pqr xyz
c2 sss qqq ooo
Hi Satya, after exploding you need to split each column like exploded_column.nested_column using withColumn function
@@rajasdataengineering7585 Thanks Raja for your reply. I have one more question, wondering if you can help.
I have two data frames contains name(df1) & address(df2) details respectively. I would like to write the name & address details in the below format to a JSON file.
{"Name" : "ABC"}
{"Address" : "Ad1"}
I used below code to merge both data frames followed by write command to write the merge output to a JSON file
val Mergedf_1_2:DataFrame = df1.unionByName(df2,true)
Mergedf_1_2.coalesce(1).write.mode("Overwrite").format("json").save("C:/Users/Test.json")
But I am getting output without indentation. Appreciate if you can share some inputs.
{"Name" : "ABC"}
{"Address" : "Ad1"}
@@rajasdataengineering7585 I have a similar requirement where there is need to convert the values of array to columns. I guess you would need a for loop using the withColumn function and drop the exploded column . How do we implement it in code.
Great video and function! Works great, however I am receiving an error when I try and pass in a 2 data frames to be flattened. The first works as expected, but when running the function on the 2nd, it returns an error " 'str' object is not callable" and points to this line.. expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]. If I redefine the function in between each dataframe it works as expected.
I think it is something to do with the expanded variable. Any thoughts? Thanks again!
when we have schema drift in azure data bricks how to get email alert notification pls let me know sir
Hi Raja, would you please share the notebook code for all you're video's ,please suggest where can we find it
Nice explanation but the user defined function is little bit complex to understand
excellent bro you are great
Thanks, glad you liked it!
Good explanation.
Codes are not visible in Notebook
Thanks. In later videos, fixed this issue
Hi Raja, we are extracting data from Rest API , we have endpoints two nested json files are generated and my manager said convert to csv as well as merge as single csv. Is it possible to merge two different schemas and generate single csv
Hi Sravan, yes it is possible. We need to compare schema and produce null values for missing columns.
You can watch this video to compare schemas th-cam.com/video/BtUFleFkXMM/w-d-xo.html
@@rajasdataengineering7585 Thank you very much
Welcome
DF=sprak.read.json('/json file path i mean /Filestore/tables')
DF.show()
Would like to know why you used flaten function here
If json file is having complex nested structure (let's say 20+ levels), it won't get parsed properly. This is one case.
In another case, while using structured streaming for Kafka integration in databricks, the entire json file would sit as binary data in value column. Without flattening the data, you cant read it
sir ..can you please explain the program each step, it will help us a greatly
Sure will make another video to explain each steps more in detail
@@rajasdataengineering7585 Sir, if the video is already uploaded please provide the link , im not getting where it is there ,explanation on the function what you have written, it would be really helpful for me and others too , please consider this one sir
Suuuuuper !!!!!! 🥇
Thank you!
Realy good video, can you share the JSON
I Will do some test on my side
Merci, Thanks
@rajasdataengineering7585 Sir, if the video is already uploaded please provide the link , im not getting where it is there ,explanation on the function what you have written, it would be really helpful for me and others too , please consider this one sir
Hi Srinu, I couldn't get time yet to create an explanation video. Will make soon and post it
@@rajasdataengineering7585 Thank you Sir, We will wait some more time, No issue
Hi Raja, video was so good and am getting error as. "
MongoTypeConversionException: Cannot cast STRING into a IntegerType (value: BsonString{value='42'})" while displaying the data. can you please help me out.
Bro while I am using this udf i am getting an error name 'ArrayType' is not defined
Awesome videos 👍
Thank you
For me, its failing due to dict object has no attribute key
how to pick all contents in json
I want to take training, is it possible to give the training
This is giving many to many relations. Not sure, if this is the expected behavior. Any thoughts on this pls ?
Yes it will give many to many relationship depending on nested level
@@rajasdataengineering7585 Thanks so much for the response Raja.
Is it possible to customize the script by giving some static field names to avoid the many to many relations ? Or if I want to go with one-to one what's the best way to customize it?
can you share the dataset
Can you share the file link please
Can we get this in python instead of pyspark.
Yes we can do in python as well. But data would be in the form of nested array instead of tabular format
Bcos I'm working in this task....I have tried many ways but nothing helped....I liked this approach but requirement is fully python code....is any other way to achieve this.....in pandas df or something like that
Can u make a fully python approach for this logic....
Hello sir, I'm Brazilian and I don't speak English fluently. Could you enable subtitles like the other videos?
Sure, will enable subtitles for all videos
@@rajasdataengineering7585 This is the only one that doesn't have subtitles active.
Thanks a lot!
You're welcome!
Video was good but it will be better if you can also explain the function that you have written. It's bit difficult to understand it.
Sir
Kindly share the notebook
Hi Sree, sure, will do
Hi raja is there any code for flattening XML plzz help mee
Hi Gayatri, I don't have any code to flatten XML but will try to create one when I get sometime
wow.. just wow
Thanks
please upload the sample json file.
Thank you
You're welcome
Glad I found this playlist, Thank you www.youtube.com/@rajasdataengineering7585
Glad it helps! Keep watching