Optimized ETL's with QueryDatabaseTable and PutDatabaseRecord | Apache Nifi | Part 10

แชร์
ฝัง
  • เผยแพร่เมื่อ 30 ก.ย. 2024
  • Time to optimize you dataflow with a new processor in Apache Nifi.
    Support the channel by Subscribing!
    PATREON - Would you like to support the channel in other way? Then check it out.
    / stevenkoon
    TWITTER
    / koon_steven
    What is Apache NiFi?
    Put simply NiFi was built to automate the flow of data between systems. While the term 'dataflow' is used in a variety of contexts, we use it here to mean the automated and managed flow of information between systems. This problem space has been around ever since enterprises had more than one system, where some of the systems created data and some of the systems consumed data.
    ~-~~-~~~-~~-~
    Please watch: "Use a Grok pattern in the GrokReader with ConvertRecord to Ingest log files | Apache Nifi"
    • Use a Grok pattern in ...
    ~-~~-~~~-~~-~

ความคิดเห็น • 32

  • @diegobotelho8227
    @diegobotelho8227 3 วันที่ผ่านมา +1

    Very helpful information...Tanks

  • @Tokolosk
    @Tokolosk 4 ปีที่แล้ว +4

    Oh boy oh boy Steven, I have no words to describe how useful your videos been. We were quoted $4400/month by a company for an AWS solution that I am now able to do on a single $100/month server with Nifi with capacity to spare... anda second $100/month server ready for DR. Sure, still lots of things to do and learn but the numbers speak for themselves!
    Your presentation style and pace is great, those 26 minutes went by fast! Looking forward to learning some more! Cheers.

    • @StevenKoon
      @StevenKoon  4 ปีที่แล้ว

      I'm thrilled to hear that the video was so useful to you. After your comments on yesterdays video I was thinking that you would really like to see this video next. Thanks again for your comments.

  • @shanthib3295
    @shanthib3295 2 หลายเดือนก่อน

    Hi Steven - I am learning Nifi. Installed Nifi in docker. Getting error org.apache.nifi.processor.exception.ProcessException: java.sql.SQLException: Cannot create PoolableConnectionFactory ( Communications link failure' in ConvertJSONToSQL. Appreciate your thoughts. I am trying to fetch from Cassandra & insert into mysql.

  • @perkinsdavidg
    @perkinsdavidg 3 หลายเดือนก่อน

    Hi Steven,
    I really enjoy your TH-cam content! I'm currently looking to set parameters from a Hive table (or a similar data source). Could you provide some guidance or point me in the right direction?
    Thanks a lot!

  • @samsal073
    @samsal073 4 ปีที่แล้ว +2

    Hi Steven,
    Thanks for the videos ...very helpful information.

  • @andreyfilatov3676
    @andreyfilatov3676 2 ปีที่แล้ว +1

    Hi Steven, Thank you a lot for you shown! One question, how to fill Catalog in correct way?

  • @medal16
    @medal16 3 ปีที่แล้ว +1

    Nice explanations. thank you! Could you demonstrate how can i made this for updates too. in this case a incremental line works very well, but how can i get the updates too? thanks.

  • @rahulkumar-jw1by
    @rahulkumar-jw1by 4 ปีที่แล้ว +1

    Awesome video I like this video tutorial. I have one doubt if I have multiple table to read and insert then how can we achieve

  • @ЮленькаЕрмакова
    @ЮленькаЕрмакова 3 ปีที่แล้ว +1

    Hello! I try to configure data selection from Hive (Serialization type = JSON) to Oracle. In PutDatabaseRecord Config I need to select Record reader - I select JsonPathReader, but state of JsonPathReader is Invalid. what could be wrong?

    • @StevenKoon
      @StevenKoon  3 ปีที่แล้ว

      Hello, If you haven't already then after you select the JsonPathReader. You need to configure the reader and enable it as well. When you look at he properties list in the PutDatabaseRecord you can see the Record Reader and the value of your JsonPathReader and to the right of that a "Go To" arrow. Click on the arrow and you will be at the list of "Controller Services". Then configure and enable you reader.

  • @chitrangsharma
    @chitrangsharma 3 ปีที่แล้ว +1

    I'm glad that I've found your video, sir would you please let me know how to transform the data ! I basically want to do some kind of masking or updation before inserting the data into the destination database. Thanks in advance

    • @StevenKoon
      @StevenKoon  3 ปีที่แล้ว +1

      There are several ways to to this at the certain level depending on the transformation that you want to do. I have many video where I ETL data from databases or API's

  • @abdullahaqeeli8074
    @abdullahaqeeli8074 3 ปีที่แล้ว +1

    Hi Steven,
    Thanks a lot for the Nifi videos they've been VERY helpful. I'm running into a challenge using QueryDatabaseTable processor. My incremental load relies on two columns, an ID and an update_time column. When I put these two columns in the Maximum-value Columns property the generated query to poll the changes has WHERE clause with an AND between the two columns condition which results in losing some of the changes. However, I would like to change that to an OR. I'd like my WHERE clause to be where ID > {MAX_ID} OR update_time > {MAX_Update_time}. Any idea how to achieve that?

    • @StevenKoon
      @StevenKoon  3 ปีที่แล้ว

      Your a limited with the processor on how complex you can get with the where conditions that you want to create. I have a problem of my own that I'm working on where I want to have several complex where conditions and I can't do it with that processor either or I just can't figure it out. I'm actually looking at seeing if I can use several executeSQL processor to create a solution. I got the idea from the video I did with the Bestbuy api creating that api loop for the multiple pages to get pulled. If I figure out a solution that works for me I'll make sure to share it.

  • @pankajpandey7556
    @pankajpandey7556 3 ปีที่แล้ว +1

    Hi Steven, Thanks for such a wonderful video. I have started using PutDatabaseRecord after watching this video. But, I find there is an Issue with PutDatabaseRecord after JOLTTransformJSON when Inserting data. Like NumberFormatExceptions errors Example: PutDatabaseRecord Failed to put Records to database java.lang.NumberFormatException: For input string: "Tue Oct 05 15:58:22 EDT 2021". Why would that be. I have jolt transform json field names match with column names. Some columns are extra in table for which no field is available in json and vice-versa

    • @StevenKoon
      @StevenKoon  3 ปีที่แล้ว

      It sounds like after your JOLT transform when you are trying to insert the data into a database that your having issues with the data types for the table you want to insert into. I would suggest using the JOLTtranformRecord so that you can use a schema with the writer and in the schema you can define the datatype of the fields.

    • @pankajpandey7556
      @pankajpandey7556 3 ปีที่แล้ว

      @@StevenKoon Thanks Steven. Yes, I had one field in DB table which was Integer on changing that to character varying it worked fine.

  • @WilliamKnak
    @WilliamKnak 2 ปีที่แล้ว

    "Input requirement: This component DOWS NOT ALLOW an incoming relationship". Is it possible to start this processor after another one?

  • @seshaiahambati1798
    @seshaiahambati1798 ปีที่แล้ว

    Hi, I have scenario like ' File move to one folder to another then process(load to Snowflake) the file & Archive it(move file again source to Archive folder)' In it Flowfile to time GetFile & PutFile, how i do depending condition one after other.

  • @kasunmathota
    @kasunmathota 3 ปีที่แล้ว +1

    Thank you so much, I am using your videos to learn NIFI ..I am getting java.sql.SQLDataException: None of the fields in the record map to the columns defined ..still couldn't fix the issue ..

    • @kasunmathota
      @kasunmathota 3 ปีที่แล้ว

      I am still stuck with the error, SQL INSERT statement due to org.apache.nifi.processor.exception.ProcessException: None of the fields in the JSON map to the columns defined

    • @StevenKoon
      @StevenKoon  3 ปีที่แล้ว

      I would check and make sure that the field names in your flowfile are the exact same name as in the table you are trying to insert them into.

    • @kasunmathota
      @kasunmathota 3 ปีที่แล้ว +1

      @@StevenKoon Thank you, it's my mad. my target database's connected user didn't have the correct permissions. It's working fine. Thank you so much. I am enjoying Nifi. Your videos helped a lot.

    • @zulfahfauziah2252
      @zulfahfauziah2252 2 ปีที่แล้ว

      @@kasunmathota halo kasun, how you solved the permission issue? thanks

  • @carlosnatividade1814
    @carlosnatividade1814 2 ปีที่แล้ว

    Good afternoon. I managed to perform all the steps and be successful! Thank you very much. Just one question: - There is no way to send only the registration records to the DatawareHouse, without the need to send all the records again - avoiding duplication of information?

  • @cetilly
    @cetilly 2 ปีที่แล้ว

    @Steven Koon Hi Steven. Great video series!!! What is the best processor for writing a large quantity of data to a database via a REST API instead of using SQL DML? For example, a single array of 50,000 JSON objects using the InvokeHTTP.

  • @kwamesefah6855
    @kwamesefah6855 2 ปีที่แล้ว

    can you please put together a video for getting data from mysql to publishing to Kafka, consuming and the putting that in s3.

  • @toledo3211
    @toledo3211 2 ปีที่แล้ว

    Hello, Thanks for the training videos. How do you keep the QueryDatabaseTable processor to stop reading the old events over and over again?

  • @MADAHAKO
    @MADAHAKO 2 ปีที่แล้ว

    Sir, you're videos are amazing!

  • @enesteymir
    @enesteymir 2 ปีที่แล้ว

    Hello Steven, thank you for these videos. I have used Nifi for several months. I can handle easily with tables migration that contains 5 - 20 million rows. Now ı am loading a big transaction table that contains 550 million rows. Generally ı perefer QueryDataforbaseTable > PutDatabaseRecord processors for small size migration, and GenerateTableFetch > ExecuteSQL > PutDatabaseRecord for medium size tables (5- 20 m). I can't use the first method because ı get heap size memory errors although ı tried max rows for a flow parameter. For very big size like 550 m rows, ı can use parallel flows that generatetablefetch create different partitions according to a where condition and order by or max column value parameters. This method works but ı try to find much more faster methods. Maybe you can give me an advice to handle very big size table migration from PostgreDB to VerticaDB without any transformations.

    • @yasmeen2236
      @yasmeen2236 8 หลายเดือนก่อน

      Hello mr teymir, I would ask what you did for handling big data table?