Apache NIFI – A graphical streaming tool with workflow features

Apache Nifi:

  • It is a data streaming and transformation tool
  • It has a nice Web based UI where we can configure the workflow.
  • To understand the power of Nifi lets play with it directly.

Lets play with Nifi:

  • Lets stream live twitter feed from the twitter hose.
  • Prerequisites
    • A twitter developer account – if you don’t have follow these steps:
      • Go to – https://apps.twitter.com
      • Now login using yr existing Twitter account ( if you even don’t have Twitter account then you have to create one)
      • After logging in – click on “Create New App”
      • Capture
      • Once the app is created, go to “Keys and Access Tokens” tab and copy the “Consumer Key (API Key)” and “Consumer Secret (API Secret)”.
      • You will need this for authentication in the NIFI UI in the twitter hose.
  • Install and Configure Nifi:
    • First connect to the Sandbox console using ‘maria_dev’ credentials and elevate your permissions to root.
    • create a folder for NIfi – mkdir opt/nifitest
    • Download the latest Nifi tar ball from – wget https://archive.apache.org/dist/nifi/1.4.0/nifi-1.4.0-bin.tar.gz
    • Extract the file – tar xvfz nifi-1.4.0-bin.tar.gz
    • Capture
    • change the folder to – cd nifi-1.4.0
    • Update this file – nano conf/nifi.properties
      • Update the following:
        • nifi.web.http.port=8090
    • Now start Nifi – ./bin/nifi.sh start
    • exIhw
    • Now login to the NiFi UI – or http://hostname:8090/nifi or http://IPAddress:8090/nifi
    • exIhw
    • There are 100s of templates available to setup and configure a workflow in Nifi. Here is the link – https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates
    • Lets download the one related to Twitter Hose – https://cwiki.apache.org/confluence/download/attachments/57904847/Pull_from_Twitter_Garden_Hose.xml?version=1&modificationDate=1433234009000&api=v2
    • Import the downloaded template into Nifi –
    • exIhw
    • Verify If Template Upload/Install was sucessful
    • exIhw
    • Now click and drag the template icon button to the work area as shown in the snapshot below:
    • exIhw
    • You will see the template like below:
    • exIhw
    • Click on “Grab Garden Hoze” to select the box first, then right click to open context menu and click ‘Configure’.
    • exIhw
    • In the ‘Configure Processor’ pop up that opens, enter the following
      • Twitter Endpoint = Filter Endpoint
      • Consumer Key, Consumer Secret, Access Token and Access Token secret from the twitter app you created.
    • exIhw
    • Now create a directory and give permissions in HDFS where we can save the tweets.
      • hadoop fs -mkdir /user/maria_dev/nifitest
      • hadoop fs -chmod 777 /user/maria_dev/nifitest
    • Now come back to Nifi UI and click and drag the processor button as in the snapshot below – and filter HDFS Processor
    • exIhw
    • Configure the processor and integrate it with workflow to do this, click
      “Find only Tweets” to “PutHDFS” by dragging relationship arrow.
    • exIhw
    • Now Configure the PutHDFS Processor (right click and “Configure” and then set the directory to – /user/maria_dev/nifitest
    • exIhw
    • Right click on “matched” queue and configure the following:
    • exIhw
    • Now select all the components (Ctrl + A) and press the process arrow button as shown below.
    • exIhw
    • Now lets verify if data was written in HDFS
    • exIhw
    • Check – hadoop fs -ls /user/maria_dev/nifitest
    • exIhw
    • Now that the data is pulled in HDFS, lets use any tool to visualize the data.
    • Lets use Hive for now –
      • Create an external table pointing to the tweets folder in HDFS:
      • CREATE EXTERNAL TABLE tweetshosedata(                                    tweet_id bigint,                                                                    created_unixtime bigint,                                                                created_time string,                                                                                    lang string,                                                                                    displayname string,                                                                            time_zone string,                                                                                 message string)                                                                                         ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’                  WITH SERDEPROPERTIES(  ‘field.delim’=’|’,  ‘serialization.format’=’|’)                                                                    STORED AS INPUTFORMAT ‘org.apache.hadoop.mapred.TextInputFormat’                   OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’  LOCATION ‘/user/maria_dev/nifitest’;

      • exIhw
      • Now query to view the data
      • select displayname, count(*) as NumberOfTweets
        from tweetshosedata
        group by displayname
        order by NumberOfTweets desc;

      • exIhw
      • So we streamed data from Twitter Hose and upload in HDFS and then created an external table in Hive to visualize the data using a Hive Query.




Apache Flink – Highly scalable streaming engine

  • Why Flink:
    • more scalable than Storm upto more than 1000s of nodes( massive scale)
    • more fault tolerant than Storm
      • maintain “state snapshots” to guarantee exactly once processing
    • In similarity, it is also based on event based streaming like Storm
  • Flink Vs Spark Streaming Vs Storm:
    • Faster than Storm
    • real time streaming like Storm
    • event based like storm and maintains windowing
    • exactly once guarantee is good for financial applications
  • Flink Architecture:
    • Flink can sit on the top of a local or standalone or a cluster based Hadoop system or even on the cloud on AWS or Google Cloud
    • Flink has 2 distinct set of APOs – Data Streaming APIs( more like Storm) as well as Data Set APIs( more like Spark Streaming)
    • In its each universe –
      • Data Streaming APIs support Event based processing as well as SQL/Table based processing.
      • Data Set APIs support FlinkML, Gelly and SQL/Table based processing
    • exIhw
    • Connectors for Flink – HDFS, Kafka, Cassandra, RabbitMQ, Redis, Elastic search, NIFI etc
  • Lets play with Flink:
    • Login to the console using maria_dev credentials and elevate to root permissions.
    • Download Apache Flink from apache.flink.org. Download using this link – wget https://archive.apache.org/dist/flink/flink-1.2.0/flink-1.2.0-bin-hadoop27-scala_2.10.tgz
    • exIhw
    • Unzip the downloadable – tar xvf flink-1.2.0-bin-hadoop27-scala_2.10.tgz
    • Move to the flink folder – cd flink-1.2.0
    • Now first lets configure the config files – cd conf
    • Edit – nano flink-conf.yaml
      • jobmanager.web.port: 8081 to jobmanager.web.port: 8082
    • Now lets start flink – go to flink’s bin folder and run
      • ./start-local.sh
    • exIhw
    • Now lets open the Flink WEb UI –
    • exIhw
    • Now lets run one example from GitHub location – wget https://raw.githubusercontent.com/apache/flink/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
    • here is the heart of the code.
      • You first get the streaming env –
        • // get the execution environment
          val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      • Then you process and transform the data –
        • // parse the data, group it, window it, and aggregate the counts
          val windowCounts = text
          .flatMap { w => w.split(“\\s”) }
          .map { w => WordWithCount(w, 1) }
    • exIhw
    • Now lets open up a port and listen on port 9000 – nc -l 9000
    • Now run the example –
      • go to /home/maria_dev/cd flink-1.2.0
      • now run – ./bin/flink run examples/streaming/SocketWindowWordCount.jar –port 9000
      • Output –
      • exIhw

Setting up a LAMP( Linux Apache MySQL PHP ) server with WordPress

  • As you can see this even this site on which I am writing my blogs is also powered on WordPress.
  • It gives you a medium to write down your experiences as blogs and posts and lets other like, dislike, comment on your post and you can efficiently transform all this on mobile device too.
  •  Setting up WordPress on Linux servers is pretty easy and documented all over the Internet and you should not spend thousands on websites, if you can do that for free or almost free( counting on the cost for registering a domain name).
  • Lets break down the task on how to setup all this:
    • One, setup up the LAMP stack
    • Two, setup up the WordPress website.
  • Setting up the LAMP server :
    • We will use CentOS 7 as the OS, but you can use any Linux Distro.
    • The commands would be almost be same except on how to use the Installation managers ‘apt’ or ‘yum’ or on how to start services.
    • I will highlight where ever needed.
    • If you have not installed the OS, you can check my one the posts on how to install Linux.
    • So lets start:
      • Login to the server and elevate your rights to superuser/root – sudo su –
      • Install the required software. You can install Apache, MariaDB, PHP all one by one or all together, I will explain both.
      • First disable SELinux – setenforce 0
      • Lets see all together – yum clean all && yum -y update && yum -y install httpd mariadb mariadb-server php php-common php-mysql php-gd php-xml php-mbstring php-mcrypt php-xmlrpc unzip wget firewalld
        • If you have Ubuntu or an Debian Linux based distro, then use apt-get instead of yum and also some of the names will change e.g. httpd will be replaced by apache2
          • apt-get clean all && apt-get -y update && apt-get -y install apache2 mysql mysql-server php php-common php-mysql php-gd php-xml php-mbstring php-mcrypt php-xmlrpc unzip wget 
        • You can do one by one installation-
          • yum -y update – to update the server
          • yum -y install httpd – to install Apache
          • yum -y install mariadb mariadb-server – to install MySQL and its dependencies.
          • yum -y install php php-common php-mysql php-gd php-xml php-mbstring php-mcrypt php-xmlrpc – to install PHP and related dependencies
          • yum -y unzip curl wget firewalld – to install miscellaneous software.
        • Capture
        • Now that we install required software, we need to start some services and enable them so that they are always execute on boot.
        • Enable and start Apache – 
          • systemctl start httpd  ( for Ubuntu based systems use – service apache2 start )
          • systemctl enable httpd( for Ubuntu based systems use – service apache2 enable)
        • Enable and start MySQL- 
          • systemctl start mariadb( for Ubuntu based systems use – service mysqld start)
          • systemctl enable mariadb( for Ubuntu based systems use – service mysqld enable)
        • Enable and start Firewall- 
          • systemctl start firewalld( for Ubuntu based systems use – service firewalld start )
          • systemctl enable httpd( for Ubuntu based systems use – service firewalld enable)
        • Capture
        • Enable Http and Https on the server
          • firewall-cmd –permanent –zone=public –add-service=http
          • firewall-cmd –permanent –zone=public –add-service=https
          • firewall-cmd –reload
          • systemctl restart httpd
        • Test if the Apache is working or not
        • Now that Apache is working and also that we have already installed MariaDB, we need to configure and run it
        • To configure MySQL or MariaDB run – mysql_secure_installation
          • It will start a wizard and will prompt you for password, simply press “Enter” because you have not setup the password as yet
          • Then it will ask – Set root password? , Type ‘Y’ and then provide password you want to set.
          • After that it will prompt you with 4 more questions – Just type ‘Y’ for all, till you get a success message
          • Capture
          • Now login to MySQL – mysql -u root -p
          • Use the above password and login to get the MariaDB prompt.
          • Now we have to create the following :
            • a database for WordPress – create database mywordpress;
            • a user for WordPress – create user mywpuser@localhost identified by ‘mywppass’;
            • grant all privileges to this user – grant all privileges on mywordpress.* to mywpuser@localhost;
            • flush the privileges so that they get applied – flush privileges;
            • then exit MySQL – exit;
          • Capture
          • Set ReWriteRule to  :
            • edit /etc/httpd/conf/httpd.conf
            • and update AllowOverride All
            • Capture
  • Setting Up WordPress:
    • Download latest WordPress package – wget https://wordpress.org/latest.zip
    • Unzip the zip file – unzip latest.zip
    • Capture
    • View the wordpress folder created by the above action – ls -ltr wordpress
    • Now copy the wordpress folder to /var/www/html – cp -avr wordpress /var/www/html
    • Grant permissions on wordpress folder –
      • chmod -R 755 /var/www/html/
    • Change the owner pf wordpress folder to Apache –
      • chown -R apache:apache /var/www/html/
    • Rename file ‘wp-config-sample.php’ to ‘wp-config.php’- mv wp-config-sample.php wp-config.php
    • Capture
    • Edit file wp-config.php – nano wp-config.php with follwing details:
      • define(‘DB_NAME’, ‘mywordpress’);
        define(‘DB_USER’, ‘mywpuser’);
        define(‘DB_PASSWORD’, ‘mywppass’);
    • Capture
    • We are done with installation, lets try connecting to WordPress
      • Type –
        • http://localhost/wordpress if your host is local
        • http://ip_addr/wordpress(get IP Address by executing ip addr command)
        • http://hostname/wordpress (get Host name  by executing hostname command)
        • Capture
        • You will get a setup page to enter site name, create a user and then you can click ‘Install WordPress’.
        • You will get a success screen and a link to login – use the above user and password to login
        • Capture
        • yay!!!, you have successfully installed WordPress on a LAMP stack.

Apache Storm – a more real streaming engine than Apache Spark Streaming

Apache Storm Vs Apache Spark Streaming:

  • Apache Storm – real time up to a sub-second level and is event based
  • Apache Spark Streaming – real time only up to a second level and is micro-batch processing based.

Apache Storm Architecture and terminologies:

  • Its used very specific terminologies – spout and bolt
  • Spout is the stream receiver and bolt is the stream processor.
  • exIhw
  • Exposes 2 kinds of APIs – Storm Core and Trident and works very nicely with Kafka.
  • Unlike Spark Streaming it provides something called “tumbling window”(they don’t overlap and e.g. if we have 3 separate windows of 5s then they are 3 different 5 sec windows ) which is unlike “sliding window”(sliding windows can overlap)

Lets play with Apache Storm:

  • Apache Storm comes pre-installed on Hortonworks so lets start Storm and Kafka.
  • exIhw
  • Now lets login to the Ambari Console.
  • This files take a stream of sentences and counts the word by updating a map.
  • location of Apache Storm client : cd /usr/hdp/current/storm-client
  • Sample Examples location – cd contrib/storm-starter/src/jvm/org/apache/storm/starter
  • lets refer to this file – https://testbucket786786.s3.amazonaws.com/WordCountTopology.java
  • Lets run the file – storm jar /usr/hdp/current/storm-client/contrib/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.WordCountTopology wordcount
  • exIhw
  • lets it run and lets go to the browser to see the Storm UI(rungs on port 8744) –
  • Capture
  • Now lets see if what dis it process in WordTopology.
  • Capture
  • Let check the logs now at : cd /usr/hdp/current/storm-client/logs/workers-artifacts
  • Capture
  • In the exercise the data was dumped to the log. We could have even written this to HDFS or some other source




Spark Streaming – processing data in almost real time

Why process big data in real time?

  • Big data is really huge, so if we still use batch processing ( E.g. running batch jobs every 10 minutes or so, we will have scalability issues. What if the data is more then can handles by that job.
  • Big data never stops, it keeps flowing. Imagine how much of sensor data is received from IoT devices, user behavior data from big social web sites.
  • So Big data can not only be streamed live but also be processed live.
  • Technologies like Apache Spark Streaming and Apache Storm are answer to this issue.

Spark Streaming Architecture:

  • exIhw
  • Spark stream is almost real time ( not exact real time though) processing engine.
  • The Spark Receivers receive live data stream from multitude of sources( it can be simple sources like a console tailed web server log, a file system, exact live stream like a twitter hose, streaming data from Kafka etc.
  • The data is then partitioned as RDD. It means that the dataset is partitioned to be processed by Spark Engine as batch jobs(incremental to a time stamp).
  • The transformed data is then consumed by multitude of tools from HDFS to NoSQL tools to SQL tools, Web Sockets,  Console or Memory output etc.
  • So you can imagine that lots of Mapper and Reducer transformation occur.
  • It supports something called as “Windowed Transformations“.
    • While processing data you can maintain state of few things.
    • E.g. most trending tweets or re-tweets over the last hour – so you will processes the data coming from twitter hose may be in a DStreamed batches of 1s but but would like to hold the data for max number of tweets or re-tweets by user.
    • Windowing allows you to hold data over batches. basically a snapshot of data over the window is taken to compute results.
    • In our case Batch interval will be one minute, window interval will be 1 hour and a sliding interval is how often we compute the transformation may be every 30 mts.
    • Just like Spark which had a SparkContext, Spark Streaming has a StreamingContext.
    • Structured streaming:
      • just like spark 2.0 uses Datasets and older version uses DataFrames
      • Similarly Spark Streaming 2.0 uses data sets which is also referred as Structured streaming where the dataset is basically a more explicit type of dataframe(Rows)
      • So basically instead of data being partitioned in many RDDs it will be a dataframe in which rows will keep adding.
      • Why structured streaming?
        • more efficient way of storing data
        • use SQL like querying
        • MLLib( the Machine learning library) is also supporting structured streaming

Lets play with Spark Streaming:

  • Simple Example – Spark Streaming with Flume:
    • Case Study :
      • There is a web server which write web server logs
      • Flume cluster agent has a ‘spooldir’ to read these log files
      • Flume then writes it to a Sink.
      • Spark Streaming cluster connects to this sink and reads this data to save it in a database or just console output.
    • Technicalities:
      • First of all download the Flume conf file here –
      • wget https://testbucket786786.s3.amazonaws.com/sparkstreamingusingflume.conf
      • This file is same as previous lecture file(Apache Flume) where we read from a spool directory and saved to a HDFS sink. But this time we will change the sink from HDFS to Avro(a communication protocol) which will dump data on port say 9092.
      • See below for the change in the sink# sparkstreamingusingflume.conf: A single-node Flume configuration# Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = spooldir
        a1.sources.r1.spoolDir = /home/maria_dev/spool
        a1.sources.r1.fileHeader = true
        a1.sources.r1.interceptors = timestampInterceptor
        a1.sources.r1.interceptors.timestampInterceptor.type = timestamp

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port = 9092

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

      • Now lets get ready with a python script which will read from the Avro Stream, process and transform it and output the result
      • Download this file – wget https://testbucket786786.s3.amazonaws.com/SparkFlumeStreaming.py
      • Explanation of the python script(see green comments):
      • exIhw
      • Also download this file to later copy/drop to the spool folder – wget https://testbucket786786.s3.amazonaws.com/streaming_log.txt
      • create a checkpoint folder under maria_dev – mkdir checkpoint
      • Open 3 more windows to run the flume and Spark jobs and one more window to copy the log file to spool folder
        • to run spark job – export SPARK_MAJOR_VERSION=2
          spark-submit –packages org.apache.spark:spark-streaming-flume_2.11:2.0.0 SparkFlumeStreaming.py
        • to run flume job – cd /usr/hdp/current/flume-server/bin
           ./flume-ng agent –conf conf –conf-file /home/maria_dev/sparkstreamingusingflume.conf –name a1
        • To copy files into the spool folder – cp streaming_log.txt spool/log1.txt
      • See the snapshot below ( enlarge in another tab to see details)


Now lets see the output :

See first window . it tread the count of the URLs as soon as we dropped the file in window 2 in the spool folder.


lets drop the file few more times and see if the values(count) doubles, triples etc.

Yes, it does.


So we are seeing the live streaming of data into a spool folder where the a Flume agent picks up the files and dumps to an Avro endpoint ….and then Spark Streaming listens to this stream to process data in a windowed transformation style and shows you the data on the console. You can even push the data to HDFS, or some othe rconsumer or a SQL or NoSQL database.





Apache Flume – Hadoop specific streaming tool

What is Apache Flume:

  • As we know that Apache Kafka is a generic streaming tool which can handle not only Hadoop specific streaming but also for non Hadoop streaming too.
  • Apache Flume is Hadoop based answer for streaming tool.

Why Apache Flume:

  • As lot of data streaming between the log sources to HDFS can be spiky affair, there should be a buffer in between to handle the load, which can regulate the amount of data moving to HDFS or in case of spiked load can store the data for a while.
  • Apache Hadoop is know to have those buffers or sinks.

Apache Flume Architecture:

  • Its made up of agents. Flume agents listen to data sources ( say web server logs )  using a channel and then save it into a sink. Data is pushed to HDFS from the sink.
  • exIhw
  • Once data is processed in Sink, its deleted.
  • Sources can have connector for a lot of source types e.g. it can read from a tail bashed log file on command line, log files as usual, a TCP port, a custom code etc.
  • Similarly you can have Sink types too. E.g HDFS, Hive, HBase, Kafka, Elastic Search, Custom code etc

Lets play with Flume(Simple example):

  • We will setup a simple agent which listens on a specific port and any data streamed from that telnet port will be captured by flume agent.
  • First lets download a flume configuration file and try to understand it.
  • wget https://testbucket786786.s3.amazonaws.com/flume-example.conf# example.conf: A single-node Flume configuration ( 1 agent a1)# Name the components on this agent (agent a1, source r1, sink k1 and channel c1, these 3 lines are describing  the source, channel and sink )
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source ( these 3 lines details the source type, binding and port )
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Describe the sink( this line describes the sink type )
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory( these 3 lines describe the channel type, capacity etc)
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel ( these 2 lines finally binding source and the sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

  • exIhw
  • Now that the config file is ready , lets kick off a flume agent
    • Flume Location : cd /usr/hdp/current/flume-server/bin
    • Command to kick the agent – ./flume-ng agent –conf conf –conf-file /home/maria_dev/flume-example.conf –name a1 -Dflume.root.logger=INFO,console
    • Now that the agent is listening to port 44444, lets telnet to port 44444 and start typing and see if flume agent can read it – telnet localhost 44444
    • exIhw

Lets play with Flume(Complex example):

  • In this example, we will have a spool folder, flume agent waits for any new files added to the spool folder and reads it and saves it into the sink.
  • Lets download – wget https://testbucket786786.s3.amazonaws.com/flumelogging.conf
  • Lets try to understand the conf file first.# flumelogging.conf: A single-node Flume configuration# Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source ( source type is a spooldir and spoll folder is  /home/maria_dev/spool, it also has a fileheader which is a timestamp interceptor and add a timestamp to the file
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/maria_dev/spool
    a1.sources.r1.fileHeader = true
    a1.sources.r1.interceptors = timestampInterceptor
    a1.sources.r1.interceptors.timestampInterceptor.type = timestamp

    # Describe the sink ( sink type is hdfs with the HDFs folder /user/maria_dev/flume/%y-%m-%d/%H%M/%S, rounding is true for 10 mts so it will fetch files for very 10 mts
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /user/maria_dev/flume/%y-%m-%d/%H%M/%S
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

  • exIhw
  • Now lets create the required folders
    • Under user/maria_dev : mkdir spool
    • Under HDFS – create a ‘flume’ folder : hadoop fs -mkdir /user/maria_dev/flume
    • exIhw
  • Lets run the agent with new conf file – ./flume-ng agent –conf conf –conf-file /home/maria_dev/flumelogging.conf –name a1 -Dflume.root.logger=INFO,console
  • Now drop some files into folder /home/maria_dev/spool (either use simple cp command, or use ftp stc)
  • Yayy!!!, the agent read the dropped files and using the HDFS sink stored in the HDFS
  • exIhw
  • exIhw
  • Also go to spool folder , you will see the file suffixed with .completed and it will show all the data it trasnsferred
  • exIhw




Apache Kafka – a tool for streaming data into the cluster


  • What is Streaming? 
    • So what if you have to capture live data or logs from a web servers,
    • you have data coming from the camera sensors of your security cam and other IoT devices
    • you have the huge stock trading data
    • and you need to upload these into the Hadoop clusters.
    • This is called Streaming.
    • Please recall that in all our previous articles, we were processing data either already sitting in the HDFS or Hive or inside some database or may be our local system and then we were either scooping it inside our cluster and then processing.
  • What is Kafka?
    • is a Publisher-Subscriber Messaging streaming tool
    • is general purpose(not only meant for big data streaming but also any general purpose data) thus immensely powerful streaming tool.
  • Kafka Architecture and Scaling?
    • Architecture:
      • There are Kafka Publishers e.g these apps are designed to read stream data from Web servers, IoT devices, Stock Trading Data etc
      • The Kafka cluster has multiples servers processing this data and storing as Topics.
      • There are Kafka Consumers/Subscribers which
        • subscribed to the Topics.
        • Multiple subscribers can subscribe to a topic and each subscribers read point is saved properly on the topic.
        • The subscribers can either be Non-Hadoop subscribers or Hadoop subscribers like Spark streaming or directly HDFS etc
        • The topics can also be persisted into databases and read/written to and fro from there.
    • exIhw
    • Scaling:
      • The Kafka Cluster consists of multiple server running multiple processes to be high available
      • Kafka can be architectures distribute data to subscriber groups. Multiples subscribers can register into a subscriber group so that the Kafka servers can distribute the data across the subscriber group.
    • exIhw
  • Play with Kafka?
    • Lets start Kafka service which comes preinstalled.
    • exIhw
    • Lets login to Ambari Console using ‘maria_dev’ credentials and lets analyse where Kafka lives and what are the files there –
      • First check the host name ( type the below command hostname)
        • Hostname
        • use the value you get in the hostname where ever I am using ‘sandbox-hdp.hortonworks.com
      • Let’s first checks where Kafka is located
        • cd /usr/hdp/current/kafka-broker
        • you can see all folders related to kafka
      • Now lets view the bin folder
        • cd bin
        • you can see all Kafka .sh files
      • Now lets create a Kafka Topic:
        • ./kafka-topics.sh –create –zookeeper sandbox-hdp.hortonworks.com:2181 –replication-factor 1 –partitions 1 –topic mydatalogs
        • alternatively to delete a topic ( it gets marked for deleted) –
          • ./kafka-topics.sh –delete –zookeeper sandbox-hdp.hortonworks.com:2181 –replication-factor 1 –partitions 1 –topic mydatalogs
      • Now lets see the list of topics
        • ./kafka-topics.sh –list –zookeeper sandbox-hdp.hortonworks.com:2181
        • it will enlist all the topics
      • exIhw
      • Now Lets run the producer
        • ./kafka-console-producer.sh –broker-list sandbox-hdp.hortonworks.com:6667 –topic mydatalogs
      • exIhw
      • Now lets run the consumer to subscribe to the topic and get the streamed data
        • ./kafka-console-consumer.sh –bootstrap-server sandbox-hdp.hortonworks.com:6667 –topic mydatalogs –from-beginning
        • use ‘from-beginning’ to read all messages, else it will only show new messages generated after the subscriber got active
      • exIhw
      • See the producer and the consumer in side by side screens
      • exIhw
      • The above example was live streaming of standard input from the keyboard
      • What if we want to live stream web server data log in real time? this would be a real life example:
  • So, the Kafka real life example:
    • You will need to use file connectors to connect to the files which we want to live stream
    • So lets see where we can configure a file connector – lets go to folder – cd /usr/hdp/current/kafka-broker/conf
    • exIhw
    • lets copy these 3 files from the kafka’s conf folder to maria_dev folder
      • cp connect-standalone.properties /home/maria_dev
      • cp connect-file-source.properties /home/maria_dev
      • cp connect-file-sink.properties /home/maria_dev
    • Lets update ‘connect-standalone.properties‘ to update line
      • bootstrap.servers=sandbox-hdp.hortonworks.com:6667
    • also update ‘connect-file-sink.properties‘ to update lines ( the file here will be target file where the streaming data will be written)
      • file=/home/maria_dev/mylogdata.txt
    • also update ‘connect-file-source.properties‘ to update lines( the file here is the source log file which will be read by producer
      • file=/home/maria_dev/mysourlogs.log
    • Lets run a consumer first which will listen for any logs
      • ./kafka-console-consumer.sh –bootstrap-server sandbox-hdp.hortonworks.com:6667 –topic mylogdata
    • Now finally run the connector/producer – ( we will execute connect-standalone.sh passing parameters for standalone, source and sink files
      • ./connect-standalone.sh /home/maria/connect-standalone.properties /home/maria/connect-file-source.properties /home/maria/connect-file-sink.properties
      • Yay!!!, you can see that once we ran the connector, it generated the log to the topic and the subscriber rtead the topic to show you data from mysql log.
    • exIhw