- 21
- 25 768
Pierre Villard
France
เข้าร่วมเมื่อ 3 มี.ค. 2012
Committer and PMC for the Apache NiFi project, I'm also a Principal Solutions Architect at Datavolo. Believer in the open source and willing to do my best to develop even further the awesome community around NiFi!
Datavolo NiFi Flow Diff GitHub Action
Demo of the new feature released by Datavolo: a GitHub Action to improve the Pull Request experience when using the GitHub Registry Client in NiFi to use git branches, make changes on your flow versions and submit the changes for review via a pull request.
Useful resources:
- Datavolo Dev Center: devcenter.datavolo.io/
- Datavolo website: datavolo.io/
- Datavolo documentation for CI/CD: docs.datavolo.io/docs/category/nifi-cicd
- Datavolo TH-cam Channel: www.youtube.com/@Datavolo2
- Datavolo Flow Diff GitHub Action: github.com/marketplace/actions/datavolo-flow-diff
Useful resources:
- Datavolo Dev Center: devcenter.datavolo.io/
- Datavolo website: datavolo.io/
- Datavolo documentation for CI/CD: docs.datavolo.io/docs/category/nifi-cicd
- Datavolo TH-cam Channel: www.youtube.com/@Datavolo2
- Datavolo Flow Diff GitHub Action: github.com/marketplace/actions/datavolo-flow-diff
มุมมอง: 441
วีดีโอ
Cloudera Flow Management Kubernetes Operator Overview
มุมมอง 3404 หลายเดือนก่อน
End to end video showing how to install the newly released Cloudera Flow Management Kubernetes Operator, then deploy a NiFi cluster, as well as a NiFi Registry instance, connect everything together and start designing flows, all of this on an OpenShift cluster. The video is also showing manual scaling of the NiFi cluster, data resiliency in case a pod is going down, and also how to use an Horiz...
Cloudera Webinar - GenAI and beyond with NiFi 2.0
มุมมอง 7755 หลายเดือนก่อน
This is a recording of a Cloudera webinar I gave on May 21st, 2024, to discuss about the new features coming with NiFi 2.0. I'm talking about the below topics and also doing a few demos: - running NiFi on Kubernetes with zookeeper-less deployments and rolling upgrades - the new Python API for Generative AI use cases - Stateless at Process Group level and CDC use cases - the new Rules Engine and...
Data Warehouse Ingestion Patterns with Apache NiFi
มุมมอง 2.2K7 หลายเดือนก่อน
This video talks through the pros and cons of three patterns you can use in Apache NiFi to ingest data into a table created with the Iceberg format. - 1st option: PutIceberg Simply push data using the PutIceberg processor. Super efficient but really only does inserts of new data into the table. It may not be a fit in all cases. - 2nd option: PutDatabaseRecord Great option that is a bit more gen...
Ingesting data with NiFi into a Delta Lake table powered by Databricks
มุมมอง 1.6Kปีที่แล้ว
In this video, I demo the use of the Cloudera exclusive UpdateDeltaLakeTable processor available in the NiFi distributions of Cloudera (Cloudera Flow Management & Cloudera DataFlow). It allows you to easily and efficiently push data into a Delta Lake formatted table. For the demo, I'm using the trial of Databricks on AWS and running a NiFi cluster in Cloudera Public Cloud on AWS. As always, com...
S3 to Cloudera Data Warehouse w Trigger Hive Metastore Event processor - Cloudera DataFlow Functions
มุมมอง 366ปีที่แล้ว
Using Cloudera DataFlow Functions to easily ingest files landing into S3 into tables of Cloudera Data Warehouse by using the newly added Trigger Hive Metastore Event processor in Apache NiFi. This is the most efficient way to run NiFi for ingesting data into CDW tables as NiFi does not have to run 24/7 and will only get executed when there is data to be ingested. This is extremely cost efficien...
Cloudera DataFlow Functions - Azure Function App
มุมมอง 229ปีที่แล้ว
This video is a quick walkthrough on how to deploy Cloudera DataFlow Functions on Azure. The goal is to quickly have a flow running as an Azure Function App. In this demo, I expose a flow that is triggered by an HTTP call sending an image, the flow receives the image, resizes it and sends it back to the caller. This is a great way for running NiFi flows in a completely serverless way that is su...
Kafka to Kafka routing with external database table lookup in Apache NiFi
มุมมอง 842ปีที่แล้ว
This video walks you through how to implement an efficient Kafka to Kafka routing based on an external mapping table. Based on some fields contained in the consumed messages, a destination topic must be retrieved from a mapping table of an external database before sending the message to the right Kafka topic. There are many ways to implement this use case with many different tools (Kafka Stream...
FlowFile Concurrency at Process Group level
มุมมอง 1.1Kปีที่แล้ว
This video walks you through the use of FlowFile Concurrency at Process Group level. This feature is very useful to replace the Wait/Notify processors (that are not always easy to use properly) in some scenarios. A very common use case is to deal with a FlowFile (let's say a ZIP file) that is going to generate a bunch of child FlowFiles (unpacking the ZIP file into the individual files of the a...
Get vs List+Fetch and using a Record Writer
มุมมอง 2.3Kปีที่แล้ว
The Get versus List/Fetch pattern is a very frequent topic in NiFi and it is key to understand what are the differences and how to properly use the List/Fetch pattern. Besides, it's now possible to configure a Record writer on the ListX processors which provides better performances and a better management of the memory when listing large number of files. This video focuses on GetFile versus Lis...
Cloudera DataFlow Functions - Real-time offloading SFTP server to AWS S3 with NiFi & AWS Lambda
มุมมอง 543ปีที่แล้ว
This is a demo on how to use Cloudera DataFlow Functions to run Apache NiFi flows as functions (in this case AWS Lambda) to offload in real-time a remote SFTP server into AWS S3 in a completely serverless way. Cloudera DataFlow Functions is a powerful option for running batch oriented use cases where NiFi does not need to run 24/7 by leveraging AWS Lambda, Azure Functions or Google Cloud Functi...
Automating NiFi flow deployments from DEV to PROD
มุมมอง 3.9Kปีที่แล้ว
This video is a refresh of a webinar I did about 2 years ago on how to automate flow deployments across environments. Here is the previous video: th-cam.com/video/XYHMExiWM6k/w-d-xo.html This new video goes one step further and leverage the new concept of Parameter Context Providers in combination with the Scripted Hook in the NiFi Registry to nicely automate the deployment of a new version of ...
S3 to Iceberg tables in CDW - Cloudera DataFlow Functions - AWS Lambda
มุมมอง 589ปีที่แล้ว
Demonstration and explanations on how to use Cloudera DataFlow Functions in AWS in order to setup an AWS Lambda function triggered by files landing into an S3 bucket and push the data of the files into an Iceberg table in Cloudera Data Warehouse (CDW) in Public Cloud. Resources: Cloudera DataFlow Functions - docs.cloudera.com/dataflow/cloud/functions.html Cloudera DFF in AWS - docs.cloudera.com...
Connect NiFi with the Cloudera DataFlow Catalog using the new Cloudera DataFlow Registry Client
มุมมอง 572ปีที่แล้ว
The Cloudera DataFlow Catalog is basically a FREE SaaS version of the NiFi Registry. In this video I'm showing you how to connect your Cloudera Flow Management deployments with the DataFlow Catalog and how to use it for versioning your flow definitions and for checking out the ReadyFlows that are made available to you by Cloudera. This feature is available starting with CFM 2.1.5 for on-prem de...
Replay Last FlowFile + Enable/Disable all Controller Services make flow design easier!
มุมมอง 311ปีที่แล้ว
A short video to demo a few new things available in Apache NiFi: - a new ConsumeTwitter processor leveraging the latest APIs - the possibility to replay the last FlowFile - the possibility to enable/disable all controller services at process group level Small capabilities, but making flow design much faster/better! Thanks for watching!
Pushing data into Snowflake via Snowpipe using Apache NiFi
มุมมอง 2Kปีที่แล้ว
Pushing data into Snowflake via Snowpipe using Apache NiFi
Cloudera DataFlow Functions - AWS Lambda - CRON driven Database offload to HTTP Slack notification
มุมมอง 140ปีที่แล้ว
Cloudera DataFlow Functions - AWS Lambda - CRON driven Database offload to HTTP Slack notification
Automatically synchronize versioned NiFi flows from NiFi Registry to Cloudera DataFlow Catalog
มุมมอง 6412 ปีที่แล้ว
Automatically synchronize versioned NiFi flows from NiFi Registry to Cloudera DataFlow Catalog
Apache NiFi - CDP Public Cloud - Multi env setup & NiFi Registry instances sharing an RDS instance
มุมมอง 5262 ปีที่แล้ว
Apache NiFi - CDP Public Cloud - Multi env setup & NiFi Registry instances sharing an RDS instance
[Twitch] Apache NiFi Monitoring (reporting tasks, Prometheus, status history, diagnostics, etc)
มุมมอง 6K3 ปีที่แล้ว
[Twitch] Apache NiFi Monitoring (reporting tasks, Prometheus, status history, diagnostics, etc)
[Ask Me Anything] - 1st Twitch session about Apache NiFi
มุมมอง 2743 ปีที่แล้ว
[Ask Me Anything] - 1st Twitch session about Apache NiFi
I am getting this error, when i try to using python api to create my own processor in apache nifi2.0.0 , can you please tell me why is it so? 2024-11-08 16:08:37,658 ERROR [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down. org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'frameworkClusterConfiguration': Unsatisfied dependency expressed through method 'setFlowController' parameter 0: Error creating bean with name 'flowController' defined in class path resource [org/apache/nifi/framework/configuration/FlowControllerConfiguration.class]: Failed to instantiate [org.apache.nifi.controller.FlowController]: Factory method 'flowController' threw exception with message: Failed to communicate with Python Controller at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:895) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.inject(AutowiredAnnotationBeanPostProcessor.java:848) at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:145) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessProperties(AutowiredAnnotationBeanPostProcessor.java:508) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1421) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:599) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:975) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:962) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:624) at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:394) at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:274) at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:102) at org.eclipse.jetty.ee10.servlet.ServletContextHandler.callContextInitialized(ServletContextHandler.java:1591) at org.eclipse.jetty.ee10.servlet.ServletContextHandler.contextInitialized(ServletContextHandler.java:497) at org.eclipse.jetty.ee10.servlet.ServletHandler.initialize(ServletHandler.java:670) at org.eclipse.jetty.ee10.servlet.ServletContextHandler.startContext(ServletContextHandler.java:1325) at org.eclipse.jetty.ee10.webapp.WebAppContext.startWebapp(WebAppContext.java:1342) at org.eclipse.jetty.ee10.webapp.WebAppContext.startContext(WebAppContext.java:1300) at org.eclipse.jetty.ee10.servlet.ServletContextHandler.lambda$doStart$0(ServletContextHandler.java:1047) at org.eclipse.jetty.server.handler.ContextHandler$ScopedContext.call(ContextHandler.java:1237) at org.eclipse.jetty.ee10.servlet.ServletContextHandler.doStart(ServletContextHandler.java:1044) at org.eclipse.jetty.ee10.webapp.WebAppContext.doStart(WebAppContext.java:499) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:93) at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:169) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:120) at org.eclipse.jetty.server.Handler$Abstract.doStart(Handler.java:491) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:93) at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:169) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:120) at org.eclipse.jetty.server.Handler$Abstract.doStart(Handler.java:491) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:93) at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:169) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:120) at org.eclipse.jetty.server.Handler$Abstract.doStart(Handler.java:491) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:93) at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:169) at org.eclipse.jetty.server.Server.start(Server.java:624) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:120) at org.eclipse.jetty.server.Handler$Abstract.doStart(Handler.java:491) at org.eclipse.jetty.server.Server.doStart(Server.java:565) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:93) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:781) at org.apache.nifi.NiFi.<init>(NiFi.java:172) at org.apache.nifi.NiFi.<init>(NiFi.java:83) at org.apache.nifi.NiFi.main(NiFi.java:332) Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'flowController' defined in class path resource [org/apache/nifi/framework/configuration/FlowControllerConfiguration.class]: Failed to instantiate [org.apache.nifi.controller.FlowController]: Factory method 'flowController' threw exception with message: Failed to communicate with Python Controller at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648) at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1337) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1167) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522) at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254) at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443) at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredMethodElement.resolveMethodArguments(AutowiredAnnotationBeanPostProcessor.java:887) ... 49 common frames omitted
Hi Pierre, great demo! We use GitHub integration with NiFi registry ver 1.23. I guess this GitHub action will not work with 1.x versions? There is not pull requests... Everything is submitted to master as you well aware. But if each commit could have a diff description from previous version - it would be great!
This is correct. All of this is only available in NiFi 2 where you have GitHub/GitLab registry clients to directly integrate NiFi with those repositories without the need for the NiFi Registry. The diff description can be quite large depending on how significant are the changes so I'm not sure that would fit well if used as the version comment. However, with the pull request approach, I'd be fairly easy for the one merging the pull request, to use the GitHub Action comment as the text of the squashed commit that is merged into the main branch.
@pvillard31 thanks. Didn't think of the size problem. Another question - how do you see migration from 1.x to 2.x in regards of going from NiFi Registry+ GitLab (1.x) to pure GitLab (2.x). Any thoughts / recommendations on the process?
@@PK-vp4hd This is a good question. It'll likely need a bit of work to be honest given that the storing itself is quite different. At a high level, I'd say: - create a new repo, create the buckets (directories) there - move the flows into the new repo, one file per flow definition, and create a new commit for each version you want to retain for that versioned flow - create the new registry client in NiFi 2 - and then you can move process groups one by one by modifying things in flow.json.gz and use the proper references (registry client ID, and version being the commit ID) Each time you'll update the flow.json.gz you'll need to stop NiFi, make the changes, restart... You can move flows iteratively or all at the same time depending on your needs. For some time you might want to have a mix of NiFi Registry and GitLab Registry client until everything is transitioned.
@@pvillard31 hm.. this sounds quite intrusive to flow.json... I was thinking more in the direction migrated as is - With registry+git, and then change source control one-by-one using UI ?
@@PK-vp4hd there is no option to "attach" something existing in NiFi to something existing in the repo. The other option if not really caring about retaining the history of versions is to stop version control on the process group and start version control with the new registry client as a new flow. This is for sure easier but you loose all of the history of versions.
Oh, this is very nice. Excited to try it.
Hello! Thank you very much for this detailed description. Really interesting, because embedded NifiRegistry is quite restricted. Is this registry service showed on video available in 1.27 Nifi? Is it possible to get it 'from the box'?
Hey, thanks for the comment. This feature is one of the new features coming with NiFi 2.0 (it's finally being released this weekend as a GA release), and this is not available in NiFi 1.x due to the API breaking changes it is introducing.
@@pvillard31 , Oh, I see. More reasons for upgrade users instances. In any case, there is the best channel about NiFi here. Thank you very much!
Thanks for sharing! Insightful content. I am a starter and I am wondering whether Nifi is able to handle cross-team collaboration? if so, I would be glad if you can share some useful links. At the same, I doubt if it is really a good choice for heavy ETL/ELT or even CDC? (even though it is possible to implement it) I see it good only as a mediation and routing tool, am I mistaken? Thank you for your feedback!
Hi, NiFi is definitely able to handle cross-team collaboration. The concept of registry client is usually what is recommended to version control flow definitions and have multiple people working on the same flows as well as building CI/CD pipelines to test and promote flows in upper environments. NiFi should be considered more as an ELT rather than an ELT. Any kind of transformation is technically doable at FlowFile level in NiFi but if you need to do complex transformations over multiple FlowFiles (joins, aggregations, etc), then a proper engine like Flink for example would likely be better (or delegate this to whatever destination system you're using - data warehouse, etc). Finally, CDC is definitely something you can do very well with NiFi. Some vendors providing support on NiFi are providing NiFi processors based on Debezium for capturing CDC events as well as processors to push those events into systems (Iceberg, Kudu, etc). There are some things to keep in mind when designing a flow to make sure events ordering is preserved but there are many options to do that in NiFi very well. Hope this helps!
@@pvillard31 Hi, So Buckets can be considered as separate projects in Nifi where data engineers can work together without disturbing other teams that are on other buckets using the same Nifi instance? And if a team want to test or deploy a given version it could be done through scripts that they need to implement and maintain? If so, this would be very interesting! I will try to have a closer look. Thank you and keep posting!
@@nasrinidhal4162 Yeah, buckets can be a separation for different teams or for logically grouping flows serving a similar purpose and then you have flows versioned in that bucket and multiple people can work on the same flow. I have a video coming soon with some nice features of NiFi 2 with branching, filing pull request and comparing versions before merging a pull request for a new flow version. I have a series of blog post and videos coming that are focusing on CI/CD with NiFi.
@@pvillard31 Cool! That would be amazing! Thanks for sharing again and keep posting.
Hi, and thanks for the video. I have question through... would there be a way to handle transactions in a scenario where I'm upserting into multiple tables, and I'd like the whole process to succeed or fail ? Coming from Talend, I usually have a pre-job that starts a transaction on a db connection, all "processors" will use the transaction, and in the post-job I will commit or rollback, depending on whether there is an error or not.
I guess the closest thing to what you describe is the option in ExecuteSQL and/or ExecuteSQLRecord processors to set SQL queries in the pre-query property and in the post-query properties. But if you mean a transaction to the database that would span across multiple processors in the flow, then it's not possible today. I could see ways of implementing this with custom processors and controller services but there is nothing out of the box today. That could be a valid feature request if you'd like to file a JIRA in the Apache NiFi project.
Thanks Third Option is phenomenal
@Pierre when using option 3 how would you handle a scenario where you want a surrogate key on the destination table
Hi I am trying to use PutIceberg Processor in NiFi with s3 as data location and AWS Glue as Hive Metastore But I am getting below error `Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"` Is there some hadoop aws binaries I need to install in my NiFi would really appreciate the help Also I am using HadoopCatalogService instead of HiveCatalogService
@pierre could you share with me the grafana json template please
Good stuff!
nice video! May i know any place can know more about CDC use cases?
I'll record a video of this very soon :)
Great video ! (I'll replace a lot of wait/notify processor)
Great video ! (I'll replace a lot of wait/notify processor)
Hi Pierre, is it possible to achieve this by using internal stage when my data is on a server and not locally?
Hi - I'm not a Snowflake expert so what I'm saying is to take with a grain of salt. You can configure an internal stage in Snowflake to point at object store locations, so you could technically push the data to the object store location and let Snowflake do the job. I'm not sure if that answers your question. If not can you provide more details about what you mean by "my data is on a server"? If you want to use NiFi, you'd first need to acquire this data via NiFi with some of the provided options, depending on the interfaces you can use with that server, and then use a similar pipeline as presented in this video.
This is awesome. Great Video.
Thanks for the great video What does that work with git flow provider instead of DB?
You can't have two NiFi registry instances pointing at the same git repo but if you have a single NiFi Registry across your different environments, then the same approach can be applied, yes.
Thanks for this. I loved it
if i have to export a template, can i use my directory as the files for the list file?
I'm not sure I understand the question. Do you mean exporting your flow as an XML file and save it in the same directory as the directory for which you have ListFile processor configured? If yes, what is the objective? (Note that XML templates are going away in NiFi 2.0)
@pierre please share the grafana import json file
Hi Pierre, great video. This opens up new opportunities 🤓 just wondering why i dont get the batch attributes when i set outbound policy to Batch Output. Im using nifi version 1.19. Any idea’s?
I don't see any reason why that would not work in NiFi 1.19. Can you share a JSON flow definition somewhere that reproduces the issue? I'll have a look
@pierre please share id of grafana OR json
CAN YOU PLEASE ADD THE LINK FOR THE TEMPLATE THAT YOU WORK WITH SO WE CAN DOWNLOAD IT
Thanks you very much Pierre for this very interesting vidéo ! Just to be sure : you need to use the ID of the table because you didn’t define an explicit location to your table ? Or do you need the ID for the UpdateDeltaLakeTable processor even if you define a location ?
Thanks for the feedback. The processor expects the table path (with the table ID) for the location of where the transaction log files are. The data file path property is for the actual data being added to the table. It could be two completely different locations (external tables for example).
Excellent Pierre! this is very useful processor! Will this processor be available in some moment in the Nifi Open Source distribution? Saludos!
Thanks for the feedback. Not in the short term, but it could happen next year though.
Hi Pierre, can you tell me what you do for the HTTP Status Code and HTTP Context Map configuration related to HandleHttpRequest and HandleHttpResponse. or can you make us a note to share on your blog, sharing with us the detail about this usecase because i find it very intersting , i redo all the work you do but i was unable to continue due to the lack of some Conf info
Happy to do a more detailed recording about those two processors but I'm not sure I understand your question. After the HandleHTTPRequest, you can route your flow file depending on your use case and change the content of your flow file accordingly, you can also set a flow file attribute with the HTTP code you want to return, you can then configure your HandleHTTPResponse processor to reference that flow file attribute for the HTTP code to return. In terms of context map, you can just select the default option provided by NiFi. You don't need to change the defaults unless you expect a massive amount of concurrent transactions to be processed by NiFi.
first of all thank you for taking the time to anwser my question. as for misunderstanding part , what i want to say is that after tryin to re-do the processors group when it's comes to the HandleHTTORequest (HTTP Contexy Map is invalid because Context map is required) and InvokeHTTP ('HTTP URL' is invalid because HTTP URL is required) and HandleHTTPResponse ('HTTP Status Code' is invalid HTTP satus code is required) i got these errors msg. so for the time being i want to know what did you as an input for the HTTP Url and HTTP status code, so my flow could run successfully @@pvillard31
Hi Pierre, Thanks for the video. This is definitely new mindset . do you see any performance differences between using this approach and the wait otify one? do you think the wait otify can be more performant in case the children flowfiles from different original files are processed on multiple threads ?
Wait/Notify is more about providing very fine grained control on how things are managed with the childs. If your use case is simple enough, this approach makes things much easier. Depending on the use case, Wait/Notify can be a better approach and, in the end, based on the configuration and use, can be more performant.
This is an excellent tutorial on a very underrated feature of NiFi. Micro-batching is a super important usecase. Thanks for the fantastic video!
This is an exciting pattern I didn't know about. Thanks for sharing!
Thank you very much for this video. It shows more insight on how to fetch files correctly.
🤯 Super
Hi @pierre, I am getting an error : "No controller service types found that are applicable for this property" even after I imported the nar file. I am using nifi 1.19.1 .
There are two NARs: mvnrepository.com/artifact/org.apache.nifi/nifi-snowflake-services-api-nar mvnrepository.com/artifact/org.apache.nifi/nifi-snowflake-processors-nar
@@pvillard31 I tried to import these two nars by hot loading them to nifi but still facing the issue. I am using version 1.19.1 nifi
@@VuNguyen-i2k what is showing in the logs exactly? more details?
Thank you very much for this.
I am just getting into using Nifi. We have an on prem MS SQL server and i would like to move data from that to your Snowflake instance wo writing tons of code. I created a stage and a pipe. Will I need to do that for every table? there are 59 in this database. IM looking for the most efficient way to do this.
I'm no Snowflake expert. On the NiFi side you could have something like ListDatabaseTables -> GenerateTableFetch -> ExecuteSQLRecord and then the Snowflake part. That would retrieve all of the data from all your tables. You can then leverage expression language and flow files attributes to have the data sent in the right destinations/tables. You'd still need to create your tables and pipes in Snowflake first, but I assume this can be somehow scripted with their CLI.
@@pvillard31 I am looking to folly this guide but i do not see the snowflakecomputingconnectionpool . i have downloaded and placed nifi-snowflake-processors-nar-1.20.0.nar and nifi-snowflake-services-api-nar-1.20.0.nar into my lib folder for nifi. Im trying to make a connection with the DBCPConnectionPool connector and im just getting a 403 error
Please tell me. And what is the difference when starting the processor using the “Run” method and the Run Once method?
Run will run start the processor and keep it running until you stop it. The processor will be scheduled according to its configuration (run schedule, cron, etc). Run Once, will start the processor, will execute it once, and stop it immediately. This is particularly useful when developing the flows and when you want to process/generate just one flow file to check that the processor is doing what you expect.
Sorry! The last version that I see on the website is version 1.19.1 and it doesn't have that processor? How do you get that version?
The Snowflake bundles are not included by default in the Apache NiFi convenience binary due to the ASF policy wrt to the size of the binary we provide. Users can download the NARs from Maven repository and drop it in their NiFi installation. mvnrepository.com/artifact/org.apache.nifi/nifi-snowflake-processors-nar mvnrepository.com/artifact/org.apache.nifi/nifi-snowflake-services-api-nar
@@pvillard31 Hi Villard I have dropped the nar files into my Nifi installation but I got an issue when I tried to create the service (Controller Service No controller service types found that are applicable for this property.) can you help me?. Thanks
@@jl-acosta can you check nifi-app.log files when you add the two NARs? If you added the NARs next to the other one, then logs would be at startup, if you added the NARs in the hot loading directory, then you should have logs around the moment you dropped the NARs there
@@pvillard31 2023-03-24 16:30:12,843 INFO [main] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: C:\NiFi ifi\.\work ar\extensions ifi-snowflake-services-api-nar-1.20.0.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[.\work ar\extensions ifi-snowflake-services-api-nar-1.20.0.nar-unpacked] 2023-03-24 16:30:13,566 INFO [main] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: C:\NiFi ifi\.\work ar\extensions ifi-snowflake-processors-nar-1.20.0.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[.\work ar\extensions ifi-snowflake-processors-nar-1.20.0.nar-unpacked] org.apache.nifi.processors.snowflake.StartSnowflakeIngest org.apache.nifi:nifi-snowflake-processors-nar:1.20.0 || .\work ar\extensions ifi-snowflake-processors-nar-1.20.0.nar-unpacked org.apache.nifi.processors.snowflake.GetSnowflakeIngestStatus org.apache.nifi:nifi-snowflake-processors-nar:1.20.0 || .\work ar\extensions ifi-snowflake-processors-nar-1.20.0.nar-unpacked Everything looks good I guess
Thank you so much for this video. Please, kindly look into doing video on Wait & Notify.
Can you do a vidoe on how to implement the NAR provider using NiFi registry. I seem to be stuck.
merci, belle démo ! A quand des cours sur Udemy ? 😉
Hi, could you share the ID of your Grafana template
@pierre can you share the json
where I can find that grafana import json?
hi Pierre, any good solution for fetching extremely large file in nifi flow? the file is larger than the content repository
What would be the protocol to retrieve the file and what would you be doing on this file? If it's just pass through to move the file from one place to another, then NiFi Stateless could be an option.
This looks good. Do we have a working example of Site2SiteProvenanceReportingTask?