MongoDB – A dive deep

  • The previous session on MongoDB was on Big data integration of MongoDB and how to use Spark and Python to access data to and fro between HDFS and MongoDB.  We saw how we can process/analyse the data using Big data technologies and then save it in MongoDB to be used by different kind of applications – https://mohdnaeem.wordpress.com/2018/01/17/big-data-integration-with-mongodb-using-spark/. 
  • In this session the focus would be on deep diving into MongoDB, install it as a standalone server on a linux system, assess it different powers and features, analyse its CRUD operations, aggregation, indexing, security, replication, sharding  etc.
  • Below is list of topics which we will try to cover:
    • Why MongoDB?
      • open source
      • document(JSON) based – supported across platforms and languages
      • high availability – embedded document structure decreases IO, fail-over support and replica sets support high availability
      • high performance and persistence – indexing makes querying faster
      • auto scaling – replication and sharding make it highly auto scaling
      • non relational database – no high IO sensitive query plans and joins
    • Installing MongoDB
      • Spin up a Linux machine using VirtualBox or VMWare (I am using the CentOS 7 Distro).
      • Using SSH login to the server and elevate to root privilege.
      • To install MongoDB, first setup a repository for MongoDB
        • Create a ‘mongodb3.6.repo’ repository file under ‘/etc/yum.repos.d’
          • nano /etc/yum.repos.d/mongodb3.6.repo
        • edit the file to add the below lines and save the file:
          • [mongodb-org-3.6]
            name=MongoDB Repository
            baseurl=https://repo.mongodb.org/yum/amazon/2013.03/mongodb-org/3.6/x86_64/
            gpgcheck=1
            enabled=1
            gpgkey=https://www.mongodb.org/static/pgp/server-3.6.asc
          • Capture
        • Now update the server and install MongoDB:
          • yum -y update && yum install -y mongodb-org
          • Capture
          • Start MongoDB – service mongod start
          • Re-Start MongoDB – service mongod restart
          • Stop MongoDB – service mongod stop
          • Enable MongoDB – chkconfig mongod on
          • Capture
          • Start Mongo shell – mongo
          • Capture
          • Import Data into MongoDB
            • Download this JSON dataset
            • Use MongoDB Import tool to import data
              • mongoimport –db myTestDB –collection restaurants –file /home/user/primer-dataset.json
            • Capture
            • To login into into Mongo Shell – mongo
            • To show the databases – show databases
            • To switch to a specific database – use myTestDB
            • To show the list of collections – show collections
            • A basic search query to find all records from restaurants and limit to show only one and make output pretty – db.restaurants.find().limit(1).pretty()
            • to check server status : db.serverStatus()
            • Capture
    • CRUD operations:
      • Insert Documents:
        •  Use following commands –
          • db.Students.insertOne()
          • db.Students.insertOne(db.Students.insertOne( { “name” : { “first name” : “mohd”, “last name”  : “naeem” }, “age” : “35”, “address” : { “address 1” : “123 Best way”, “city” : “dallas”, “state” : “tx”, “zip” : “75039” } }}

          • Capture
          • db.Students.insertMany()
          • db.Students.insertOne([ { “name” : { “first name” : “bob”, “last name”  : “martin” }, “age” : “37”, “address” : { “address 1” : “123 Ant St”, “city” : “plano”, “state” : “tx”, “zip” : “75056” } }, { “name” : { “first name” : “jacob”, “last name”  : “tran” }, “age” : “38”, “address” : { “address 1” : “1092 allen St”, “city” : “allen”, “state” : “tx”, “zip” : “75079” } }])

          • Capture
          • Capture
      • Query Documents:
        • find all documents : db.Students.find()
        • limit to 5 results : db.Students.find().limit()
        • beautify the output : db.Students.find().pretty()
        • Use “IN” operator :  db.Students.find( { “address.city” : { $in : [“allen”,”plano”] } } )
        • Use “AND” operator :  db.Students.find( { “address.city” : { $eq : “allen” }, “address.state” : { $eq : “tx” } } )
        • Use “OR” operator : db.Students.find( { $or: [ { “address.city”: “allen” }, { age: { $gt: “20” } } ] } )
        • Capture
        • Use ‘AND’ and ‘OR’ both :
          • db.Students.find( {
            address.state: “tx”,
            $or: [ { age: { $gt: “20” } }, { address.city: “allen” } ]
            } )
        • Show or hide a column using 1 or 0:
          • db.Students.find( { “address.state”: “tx” }, { “name”: 1, “age”: 1, “_id”:0 } ) # shows name and age but hides ‘_id’
          • db.Students.find( { “address.state”: “tx” }, { “name”: 1, “age”: 1, “address.city”: 1, “address.state”: 1, “_id”: 0 } ) # specifically shows name, age, city, state and hides ‘_id’
          • db.Students.find( { “address.state”: “tx” }, { “_id”: 0 } ) # shows all columns except ‘_id’
          • Capture
        • Iterate through an array:
          • var iter = db.Students.find( { “address.city”: “allen” } );while (iter.hasNext()) {
            printjson(iter.next());
            }
          • Capture
        • To check the database stats : db.stats()
        • To create a new database : use databasename
        • To drop a database : db.dropDatabase()
      • Update Documents :
        • the following update methods are supported:
          • update one document : db.students.updateOne()
          • update many documents : db.students.updateMany()
          • replace one document : db.students.replaceOne()
          • replace many documents : db.students.replaceMany()
          • db.students.findOneAndReplace()
          • db.students.findOneAndUpdate()
          • db.students.findAndModify()
          • db.students.save()
          • db.students.bulkWrite()
        • Lets see few examples:
        • use testdb
          db.students.find()
          db.students.updateOne( { “_id”: ObjectId(“5a62e4bb46f31613eb74604b”) }, { $set: { “age”: “42” } } )
          db.students.updateMany( { “address.state”: “tx” }, { $set: { “age”: “43” } } )
          db.students.updateOne( { “_id”: ObjectId(“5a62e4bb46f31613eb74604b”) }, { $set: { “age”: “47” } } )
          db.students.replaceOne( { “_id”: ObjectId(“5a62e4bb46f31613eb74604b”) }, {“name” : { “first name” : “Mohd”, “last name” : “Naeem” }, “age” : “49”, “address” : {
          “address 1” : “123 Best Way”, “city” : “Dallas”, “state” : “TX”, “zip” : “75039” } })
        • Capture
      • Delete documents :
        • Methods supported:
          • db.collection.deleteOne()  – deletes one document
          • db.collection.deleteMany() – deletes multiple documents
          • db.collection.remove() – deletes one document
        • Lets see few examples:
        • use testdb
          db.students.find()
          db.students.deleteOne( { “_id”: ObjectId(“5a63d7ff611fcb0edc56a968”) } )
          db.students.remove( { “_id”: ObjectId(“5a63d90b611fcb0edc56a969”) } )
          db.students.deleteMany( { “name.last name”: “Tomg” } )
        • Capture
    • Aggregations:
      • functions supported:
        • db.collection.aggregate([{$group : {_id : “$address.city”, totalage : {$sum : “$age”}}}])
        • db.collection.aggregate([{$group : {_id : “$address.city”, avgage : {$avg : “$age”}}}])
        • db.collection.aggregate([{$group : {_id : “$address.city”, minage : {$min : “$age”}}}])
        • db.collection.aggregate([{$group : {_id : “$address.city”, maxage : {$max : “$age”}}}])
        • db.collection.aggregate([{$group : {_id : “$address.city”, firstage : {$first: “$age”}}}])
        • db.collection.aggregate([{$group : {_id : “$address.city”,lastage : {$last: “$age”}}}])
      • Lets see some examples:
      • use testdb
        db.students.find()
        db.students.count()
        db.students.distinct(“name.first name”)
        db.students.find().sort({“age” : -1})
        db.students.aggregate([{$group : {_id : “$address.city”, averageAge : {$avg : NumberInt(“$age”)}}}])
      • Capture
    • Indexing:
      • Indexes help in efficient execution of a query.
      • If no indexes are present, it performs a collection scan meaning scanning all document in a collection
      • If a proper index exists , it uses the index to limit the number of documents it must inspect.
      • To create an index on age filed in ascending order
        • db.students.createIndex( { age: 1 } )
      • To create an index on age filed in descending order
        • db.students.createIndex( { age: -1 } )
      • How to evaluate the index plan:
        • See how the full document scan being used before indexing:
        • db.students.find().limit(3).sort({“age” : -1}).explain()
        • Capture
        •  Now lets create the index – db.students.createIndex( { age: -1 } )
        • Now lets check the query plan, you can see that now its is using  the index – “stage” : “IXSCAN“, “keyPattern” : { “age” : -1 }, “indexName” : “age_-1“,
        • Capture
    • Replication and Sharding:
      • Replication:
        • provides high availability by redundancy
        • In MongoDB all the mongo processes which hold similar data set are called replica sets.
        • By replication, the master replicates the data to secondaries
      • Sharding:
        • With humongous data its challenge for single master to handle all data.
        • Sharding helps in breaking the data in small replica sets, each ‘shard’ holds a specific replica set based on the ‘shard key’ and is managed by Config servers holding information about which shard is holding which replica set .
        • Sharding helps in an increased throughput due to sharing of the read and write workload by multiple shards.
      • A deep dive in Replication and Sharding will be the part of another blog post.
    • Security and Administration:
      • A deep dive in security and administration will be the part of another blog post.
Advertisements

Big Data Integration with MongoDB using Spark

  • Why MongoDB? :
    • Lets evaluate MongoDB on CAP theorem to assert ‘Why MongoDB’
      • Partition tolerance is a MUST in Bigdata scenarios as well as where we are using humongous data – MongoDB is good as it supports sharding, can be scaled-out easily
      •  Consistency Vs Availability – MongoDB favors consistency over availability. Mongo DB just like Hbase has a Master so data is consistent but what if the master goes down? Well the secondary nodes can be elevated to master
    • Capture
    • MongoDB does not have a schema and is not relational so no expensive joins unlike relational databases like MySQL, MariaDB
    • It does not even need a primary key unlike Cassandra as it automatically add a GUID to each row
    • Its JSON based so very fast:
      • {
           _id: ObjectId(7df78ad8902c)
           movie_title: 'Speed', 
           movie_description: 'Its a action drama movie',
           ratings: [	
              {
                 userName:'Mohd Naeem',
                 ratingMessage: 'This a the best action movie',
                 dateCreated: new Date(2018,1,16,20,35),
                 rating: 5
              },
              {
                 user:'Maria',
                 message: 'Really liked this movie a lot',
                 dateCreated: new Date(2018,1,16,20,45),
                 rating: 4
              }
           ]
        }
  • MongoDB Architecture:
    • It consists of Database, Collections, Documents, Fields.
    • It consists of a single master and many secondary nodes
    • Capture
    • Supports Indices and sharding( creating replica sets) based on a single index
  • MongoDB Installation:
    • Login to a Ambari Sandbox with ‘maria_dev’ credentials and elevate to root.
    • If you are installing MongoDB on any other stack like Cloudera or Map-R or Custom Hadoop setup, then you might need to install it standalone.
    • But for Hortonworks Ambari, there is already a connector which can be used to install, please follow these steps:
      • change directory to –
        • cd /var/lib/ambari-server/resources/stacks/HDP/2.5/services
      • now execute :
      • restart Ambari service:
        • service ambari restart
      • Capture
      • Now login to the Ambari Dashboard:
        • In the bottom left side of main Ambari Dashboard, select “Add Service” from the “Actions” drop down
        • Capture
        • Now select “MongoDB” from the list of services and Press Next
        • Capture
        • Since we are having a 1 cluster sandbox, we will leave everything else as default in rest of the wizard but in case you have a multi cluster setup then you can selectively configure nodes.
        • Keep pressing ‘Next’ and in last steps “Process Anyway” and “Deploy”. Press “Complete” once deploy succeeds.
        • MongoDB is installed, up and running.
  • MongoDB Integration:
    • Install few dependencies:
      • Install pymongo – pip install pymongo
      • If pip is not installed – yum -y install python-pip
      • Now you are ready.
    • Lets download the Python script from here —https://s3.amazonaws.com/testbucket786786/SparkMongoDBIntegration.py
    • Capture
    • Explanation of code:
      • Code is similar to Cassandra Integration, main difference is that the driver will change from that of Cassandra to that of MongoDB.
      • The below code section will create a Spark session, pull the data from HDFS, process each line to split it into a Row and then map it into an RDD and finally to a DataFrame
      • # SparkSession creation sprkSession = SparkSession.builder.appName(“MongoDBIntegration”).getOrCreate()                                                                                                                      # Getting data from HDFS user files theLines = sprkSession.sparkContext.textFile(“hdfs:///user/maria_dev/ml-100k/u.user”)                                                                                                       # process each line and map into an RDD using a structure Row (userID, age, gender, occupation, zip)                                            usersData = theLines.map(processLines)                                                 # Converting the RDD into a DataFrame usersDataset = sprkSession.createDataFrame(usersData)

      • the save method saves data into MongoDB and load method will read data from MongoDB

        # Now push data into MongoDB
        usersDataset.write\
        .format(“com.mongodb.spark.sql.DefaultSource“)\
        .option(“uri“,”mongodb://127.0.0.1/movielens.usersData“)\
        .mode(‘append’)\
        .save()

        # Now read data from MongoDB
        readUsers = sprkSession.read\
        .format(“com.mongodb.spark.sql.DefaultSource“)\
        .option(“uri“,”mongodb://127.0.0.1/movielens.usersData“)\
        .load()

         

      • The below code will create a view and Spark SQL statement to read data
      •  # create a view  # create a view     readUsers.createOrReplaceTempView(“usersData”)                            # use spark sql to read data                                                                    sprkSql = sprkSession.sql(“SELECT * FROM usersData WHERE age < 18”)                                                                                                sprkSql.show()

      • Now lets execute the script and verify data:
        • set Spark versions to 2 : 
          • export SPARK_MAJOR_VERSION=2
        • Execute now ( see the new mongo spark connector) :
          • spark-submit –packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 SparkMongoDBIntegration.py
        • Capture
          Yay!!!, MongoDB Spark Integration succeeded

           

        • Lets verify in MongoDB command line too :
        • To run MongoDB command line – mongo
        • To use a database : use movielens
        • To find a row( db.collectioname.find(condition) : db.usersData.find({user_id:100})
        • Capture
        • To explain the execution plan
          • db.usersData.explain().find({user_id:100})
        • To Index the collection ( means creating an index on user_id field 1 means ascending order:
          • db.usersData.createIndex({user_id:1})
        • Capture
        • Count of rows in UsersData collection – db.usersData.count()
  • MongoDB Advanced Topics:

Big Data Integration with Cassandra using Spark

  • Why Cassandra:
    • Before we discuss Cassandara, we have to also discuss about something called as CAP Theorem – As per CAP(Consistency, Availability and Partition tolerance) theorem  – “you can achieve max 2 of the 3 at max for a system” …
      • Consistency – means if you write some data, the system should be consistent  to get that data back asap. Consistency can be ‘read after write’ or ‘eventual’
        • read after write : you can read the data as soon as you write
        • eventual : there is a lag and although the data is guaranteed to be read but may be after a second or two.
      • Availability – means the system should be always available no matter what using master-slave, live-backup, primary-secondary mechanisms. Even if one or few nodes in a fleet go down, still is available through other nodes.
      • Partition tolerance – means that the system should be highly distributed, scalable and partitionable.
      • It is a NoSQL /non-relational database with syntax like SQL.
      • Lets draw a CAP model and see how different data access technologies fare:
        • Capture
          Cassandra – has tunable consistency(more of eventual, but can be tuned), highly distributed and high availability

           

  • Cassandra Architecture:
    •  It is a NoSQL distributed database with SQL-like commands called as CQL.
    • CQL is similar in syntax like SQL but has limitations like
      • No Joins are supported so all data but me denormalized
      • Each table must have a primary key
      • Databases in Cassandra are called Keyspace
      • the command like interface is called CQLSH
    • DataStax is a connector for Cassandra + Spark
      • alllows Sparks to use data-frames to write and read data to/fro from Cassandra tables.
      • can be used in following use cases:
        • data transformed in Spark is saved in Cassandra to be viewed by various presentation tools
        • Data stored in Cassandra can be pulled by Spark to analyse.
    • Capture
      Cassandra Simplified Architecture
  • How to install Cassandra:
    • Its not part of Hortonworks Ambari or Cloudera clusters
    • Need to installed by either a docker or manually
    • Login to Ambari Sandox box using ‘maria_dev’ credentials and elevate to root user.
    • First update the sandbox – yum -y update
    • In case you get update issues  – move the sandbox.repo file from  /etc/yum.repos.d folder to /tmp folder like this:
      • mv /etc/yum.repos.d/sandbox.repo /tmp
    • Install tools to install multiple versions of Python  and use them without breaking each other –
      • yum -y install scl-utils – to install scl utilities
      • yum -y install centos-release-scl-rh – to install centos related scl utilities
      • yum -y install python27 – to install python 2.7
      • scl enable python27 bash – to enable python 2.7
    • Create a repository for datastax as : nano /etc/yum.repos.d/datastax.repo
    • CaptureNow you are ready to install Cassandra: yum -y install dsc30
    • Now lets install some dependencies for its CQLSH : yum -y install sqlsh and pip install cqlsh
    • Now lets start Cassandra service – service cassandra start
    • Now lets start CQLSH command line – cqlsh –cqlversion=’3.4.0′
    • Capture
    • Once you are in CQLSH shell now you can create a Keyspace and then tables.
    • First create a KeySpace movielens – CREATE KEYSPACE movielens with replication = {‘class’ : ‘SimpleStrategy’, ‘replication_factor’ : ‘1’} AND durable_writes = true;
    • Now use Movielens Keyspave : use movielens;
    • Now create table : CREATE TABLE usersData (userID int, age int, gender text, occupation text, zip text, PRIMARY KEY(userID));
    • View Empty table – SELECT * FROM usersData;
    • View table structure – DESCRIBE usersData;
    • CaptureWow, we create a table too now we will use spark to push data into this table
    • Please download script https://s3.amazonaws.com/testbucket786786/SparkCassandraIntegration.py
    • Capture
    • Lets execute the script – spark-submit –packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11 SparkCassandraIntegration.py –conf spark.cassandra.connection.host=”127.0.0.1″
    • CaptureCapture
    • Lets verify if data was written to the table, login back to cqlsh and use movie lens Keyspace and do a select.
    • Capture
    • To stop a Cassandra connection – service cassandra stop

How to interact with HDFS using HBase and Pig

  • Interacting with HDFS using HBase and Python was very powerful but it was also very engaging as we havd to do a lot of things in Python to access data.
  • HBase and Pig make the same job restricted to very few lines.
  • Lets try HBase and Pig in Action:
    • Here is the code snippet ( link – https://s3.amazonaws.com/testbucket786786/hbase.pig )
    • #first load the data file using LOAD command with pipe delimeter
      usersData = LOAD ‘/user/maria_dev/ml-100k/u.user’
      USING PigStorage(‘|’)
      AS (userID:int, age:int, gender:chararray, occupation:chararray, zip:int);#now STORE this usersData into Hbase with the below column family
      STORE usersData INTO ‘hbase://usersData’
      USING org.apache.pig.backend.hadoop.hbase.HBaseStorage (
      ‘userInfo:age,userInfo:gender,userInfo:occupation,userInfo:zip’);
    • How simple and easy is it using PIG now – only 2 commands first LOAD and then STORE.
    • Steps to execute:
      • Login to hbase shell first – hbase shell
      • List to check if the table already exists – list
      • Create a table ‘usersdata’ with a column family ‘userInfo’ – create ‘usersData’, ‘userInfo’)
      • capture
      • exit out of the shell – quit
      • Now from the maria_dev local sandbox folder ( in super user mode) run the pig file  – pig hbase.pig
      • capture
        pig hbase.pig executed

         

      • Now lets verify if tables properly added or not and the added data if any
      • To see if table exists – list
      • To see the data in the table – scan ‘usersData’
      • capture
        Yay!!! we upload data in HBase using Pig. E.g. see for userId 99 for column family userInfo, we have 4 rows of data with timestamp so that we can manage versions too and age, genfer, occupation and zip columns reside in the column family.
      • Disable and drop a table – disable ‘usersData’ and drop ‘usersData’

How to interact with HDFS using HBase and Python

  • What is HBase:
    • HBase is a NoSQL/non-relational answer your big data queries where relational databases can’t be as scalable as non relational ones.
    • It provides random fast access to HDFS and shows data using key value pairs.
    • You can use a REST service which sits on top of HBase and accomplish data access and management using CRUD operations.
  • HBase Architecture:
    • HBase sits on the top of HDFS and consists of region servers( region servers does not mean some physical region servers but each such feelt of servers holds specific blocks of data similarly as HDFS has on the nodes.
    • There is also a fleet of ‘HBase Master’ server which holds the information about which region server is holding which data
    • Also there is ZooKeeper which watches the watcher(the HBase Master, in case on of the master goes down, Zookeeper knows which master to talk to)
    • Capture
      HBase Simplified Architecture

       

  • How to use HBase:
    • Capture
      There are multiple ways to access HDFS via HBase using HBase APIs, REST service.

       

    • How to make HBase work:
      • For the clients to access we also need to open port forwarding to HBase Port 8000 as below
      • Capture
        In the VM settings for Hortonwork Ambari Cluster, select ‘Network’, on the Network Adapter’s tab for NAT, select ‘Port Forwarding’
      • First start the HBase service as below:
      • Capture
        Select service HBase and from the service actions click ‘start’.
      • Then start the HBase REST service as below:
      • Capture
        Run HBase REST service using command – /usr/hdp/current/hbase-master/bin/hbase-daemon.sh start rest -p 8000 -infoport 8001

         

         

  • How to run the client for data access using CRUD operations:
    • Please download this script on your local machine as well as the u.data file from- https://s3.amazonaws.com/testbucket786786/RatingsDataUsingHBaseREST.py as well as the u.data file from  https://s3.amazonaws.com/testbucket786786/u.data
    • #if not installed first install starbase in your local system using pip install starbase ( if pip is also not installed then use yum -y install python-pip or apt -y python-pip based on your distro)
      from starbase import Connection
      #now create a connection to the Hbase Rest Service running at port 8000 on the top of HBase
      conn= Connection(“192.168.1.88″,”8000”)
      print “Connected to 192.168.1.88 on port 8000\n”
      #create a table ratingstest
      ratingstest= conn.table(“ratingstest”)
      #if ratingstest table exists then drop it
      if(ratingstest.exists()):
               print “Dropping existing ratingstest table\n”
               ratingstest.drop()
      #now create a ratings column family – Hbase stores data as key value pairs where value can be a column family
      ratingstest.create(“ratings”)
      #now open the source data file u.data in read mode , please remind that this file is on yr local machine, not the Ambari cluster
      print “Parsing ml-100k ratings data…”
      ratingsFile = open (“/home/mnaeem/Dumps/bigdata/datasets/ml-100k/u.data”, “r”)
      #starbase interface not only has line commands but also batch commands – and both have CRUD methods
      theBatch = ratingstest.batch()
      #read the ratings local file
      #batch.insert into ratingstest table using key value pair where vale is a column family holding movie id and rating for that user
      for theline in ratingsFile:
                 (userID, movieID, rating, timestamp) = theline.split()
                 theBatch.insert(userID ,{‘ratings’: {movieID: rating}})
      #now close the local file
      ratingsFile.close()
      #now commit the changes via the batch into the ratingstest tables
      print “Committing ratings data to HBase via REST service\n”
      theBatch.commit(finalize=True)
      # retrive data from Hbase
      print “Getting data back from HBase…\n”
      print “Does ratingstest table exists :\n”
      print ratingstest.exists()
      print “Ratings for User 6 : \n”
      print ratingstest.fetch(6)
      print “Ratings for User 200 : \n”
      print ratingstest.fetch(200)
      print “Ratings for all users: \n”
      print ratingstest.fetch_all_rows()
      #finally drop the ratingstest table
      ratingstest.drop()
    • hbase
      Snapshot of the script

       

    • Explanations:
      • Hbase REST service allow command – from starbase import Connection – to import Connection module from ‘starbase’ wrapper
      • To connect to HBase REST service, specify Host IP and Port which we configured for port forwarding – conn= Connection(“192.168.1.88″,”8000”)
      • To create a table – ratingstest= conn.table(“ratingstest”)
      • To check all tables which – conn.tables()
      • To add a column to a table – tablename.create(“colname”) or tablename.add_column(“colname”)
      • To drop a column –  tablename.drop_column(“colname”) or  tablename.remove(“colname”)
      • To drop a table – ratingstest.drop()
      • All other commands are enlisted here – https://pypi.python.org/pypi/starbase/0.3.3
    • So now lets execute and see Hbase in action
    • hbaseaction.png
      Yay!!!, we connected to HBase REST service, created a table ‘ratingstest’, then inserted the movie ratings per userID in a batch and then finally retrieved the data from HBase

Exchanging Data between MySQL and Hadoop using Scoop Import and Export

  • The distributed Hadoop file system can not only retrieve data from flat files but also my structured as well as unstructured sources.
  • Today we will exchange( import and export both) data between MySQL and Hadoop.
  • Step 1 – First load data into MySQL from some external source
    • Connect to the Ambari Sandbox using ‘maria_dev’ credentials
    • Now use the wget command to download a SQL script which creates a database and puts data into respective tables
    • wget https://s3.amazonaws.com/testbucket786786/movielens.sql
    • Capture
      movielens.sql downloaded from https://s3.amazonaws.com/testbucket786786
    • in Ambari MySQL in already pre-installed, if not you might need to install MySQL first. The credentials of pre-installed MySQL in Ambari/HortonWorks is root with password hadoop.
    • Let’s connect to MySQL and create a ‘movielens’ database and run our SQL script to create all the tables and data for this exercise.
    • Capture
      mysql -u root -p command prompts you to enter password to connect to MySQL then you check the list of databases using show databases command. Now create a database movielens using command create database movielens and finally switch to that database using command use movielens.

       

    • Now lets execute the downloaded SQL script to generate the data.
    • Capture
      You should set the names and character set to UTF8 because it might contain UTF8 characters too. Use commands – set NAMES ‘UTF8’ and set CHARACTER SET UTF8. Then finally execute the downloaded SQL script to generate tables and data as source movielens.sql;

       

    • Verify that now SQL has data needed and run a SQL query to check for the MostRated Movie.
    • Capture
      show tables; command shows list of all tables in the database. describe movies; command show the schema of the table movies and so on.

       

    • Capture
      The query and the results

      select movies.title, count(ratings.movie_id) as ratingCount, avg(ratings.rating) as avgRatingselect movies.title, count(ratings.movie_id) as ratingCount, avg(ratings.rating) as avgRating    from movies    join ratings    on movies.id=ratings.movie_id    group by ratings.movie_id having ratingCount>10    order by avgrating DESC limit 10;

  • Step 2 : Import Data from MySQL to HDFS
    • We use Sqoop Import Command for importing data into HDFS
    • First Important step – grant all privileges on movielens database. Run this command on MySQL and then exit the MySQL shell using exit command
      • grant all privileges on movielens.* to ”@’localhost’;
    • Command breakdown is like this –
      • sqoop import –connect jdbc:mysql://localhost/movielens –driver com.mysql.jdbc.Driver –table moviestab -m 1
        • –connect jdbc:mysql://localhost/movielens species your MySQL database connection
        • –driver com.mysql.jdbc.Driver – species the driver
        • –table moviestab speciesfies which table to export
        • -m 1 – specifies that we want to use only one mapper. For large datasets we can use more.
        • Capture
          Scoop Import in action.
        • Capture
          Wow…result is success with 1682 rows imported
        • Once succeeded we will log into Ambari Files view to see if files have been loaded or not.
        • Capture
          Great, we imported data from MySQL to HDFS
  • Step 3 : Import Data from MySQL to Hive
    • lets see how can we import directly into HIVE.
    • sqoop import –connect jdbc:mysql://localhost/movielens –driver com.mysql.jdbc.Driver –table movies -m 1 –hive-import

      Capture
      Only difference is that we added a flag –hive-import
    • Capture
      yay!!!, the data is imported directly to hive.
  • Step 4: Export Data from Hive to MySQL
    • lets do the opposite, export data from HDFS to MySQL
    • One pre-conditions is there that the table should be existing in MySQL as Sqoop export does not create tables at destination
    • Use this command to create a new empty table from movies table
      • create table moviestab as select * from movies where 1=0
      • this will create a table moviestab which is empty as condition 1=0 does not return any rows, but definitely copies the structure.
      • Capture
      • The command to export data is :
      •  sqoop export –connect jdbc:mysql://localhost/movielens –driver com.mysql.jdbc.Driver –table moviestab -m 1 –export-dir /apps/hive/warehouse/movies –input-fields-terminated-by ‘\0001’
      • See the extra flags –export-dir /apps/hive/warehouse/movies specifies the source and  –input-fields-terminated-by ‘\0001’ 
      • lets execute and get results.
      • Capture
        Sqoop export in action

         

      • Capture
        yay!!!, the scoop export succeeded. Lets now check from MySQL.

         

      • Capture
        Yay!!!, we succeeded in exporting data from HDFS back to SQL into a newly created empty table movietab and select 5 rows to verify.

         

      • Hope this all helped you.

How to process data to recommend movies for a specific user( using Machine Learning and Spark2)

  • As Spark 2 supports datasets which is the extension of RDDs, we can use these datasets to model into a Machine Learning Model and get the results back as recommendations
  • To get into action lets make a little modifications to the u.data file – lets assume that there is a hypothetical userID ‘0’ which has rated ‘Starwars’ and ‘Empire Strikes’ as 5 but ‘Gone with the wind’ as 1.0.
  • sparkMLuser
    these 3 rating added to the top of the u.data file and the file then uploaded back to HDFS location.

     

  • Now upload the file to the HDFS location so that now we have a updated u.data file with these 3 extra ratings by userID 0
  • Here is the code – https://s3.amazonaws.com/testbucket786786/MovieRecommendationsALS.py
  • Here is the snapshot of the code with explanations
  • sparkMLcode
  • This codes use a ALS Model for recomendations
  • # Create an ALS collaborative filtering model from the complete data set
    alsColl = ALS(maxIter=5, regParam=0.01, userCol=”userID”, itemCol=”movieID”, ratingCol=”rating”)
    model = alsColl.fit(movieRatingsDataset)
  • and
  • # Run our model on that list of popular movies for user ID 0
    recommendations = model.transform(popularMovies)
  • Here are the results for the movie recommendations using machine learning in Spark 2…..Nice group of movie below….as recommendations ….
  • sparkMLres