|By MC Brown||
|November 20, 2014 04:30 PM EST||
Replicating from MySQL and Oracle into Hadoop
The notion of a database silo - that is, a single database that contains everything and operates in isolation - is a rapidly fading concept in most companies. Many use multiple databases to take advantage of the different range of functionality, from their transactional store, to caching and session stores using NoSQL and, more recently, the transfer of information into analytical stores such as Hadoop.
This raises a problem when it comes to moving the data about. There are many different solutions available if all you want to do is move some data between systems through import and export. These are clumsy solutions, they can be scripted, but more usually you want to provide a regular stream of changes into Hadoop so that you can process and analyze the data as quickly as possible. In this article, we'll examine the traditional dump and load solutions and look at a solution that enables real-time replication of data from MySQL and Oracle into Hadoop.
Ignoring just for the moment where the data will come from, before we start doing any kind of import into Hadoop you need to think about how the data will be used and accessed on the Hadoop side. The temptation is to ignore the destination format and information, but this can lead to long-term problems in terms of understanding the data and processing it effectively.
Underlying all the decisions of what part of Hadoop will use the data is how the data is stored. Within Hadoop, data is written into the HDFS file system. HDFS stores data across your Hadoop cluster by first dividing up the file by the configured block size, and second by providing replicas of these blocks across the cluster. These replicas serve two purposes:
- They enable faster and more easily distributed processing. With multiple copies, there are multiple nodes that can process the local copy of the data without having to copy it around the cluster at the time of processing.
- They provide high availability, by allowing a single node in the cluster to fail without you losing access to the data it stores.
Ultimately this leads to the first key parameter for data loading into Hadoop - larger files are better. The larger the file, the more blocks it will be divided into, the more it will be distributed. Ergo, the faster it will be to process across the cluster as multiple nodes are able to work on each block individually.
When it comes to the actual file format, in most cases, Hadoop is designed to work with fairly basic textual data. Rather than the complicated binary formats that traditional databases use, Hadoop is just as happy to process CSV or JSON-based information.
One of the many benefits of Hadoop and the HDFS system is that the distributed nature makes it easy to parse and understand a variety of formats. In general, you are better off writing to a simpler format, such as CSV, that can then be parsed and used by multiple higher-level interfaces, like Hive or Impala. This sounds counter-intuitive, but native binary representations when processed in a distributed fashion often complicate the process of ingesting the data.
So with that in mind, we can generate some simple data to be loaded into Hadoop from our traditional database environment by generating a simple CSV file.
There are many different ways in which you can generate CSV files in both Oracle and MySQL. In both solutions you can normally do a dump of the information from a table, or from the output of a query, into a CSV file.
For example, with MySQL you can use the SELECT INTO SQL statement:
mysql> SELECT title, subtitle, servings, description into OUTFILE 'data.csv' FIELDS TERMINATED BY ',' FROM recipes t;
Within Oracle, use the SPOOL method within sqlplus, or manually combine the columns together. Alternatively, there are numerous solutions and tools for reading SQL data and generating CSV.
Once you've generated the file, the easiest way to get the data into HDFS is to copy the information from the generated file into HDFS using the hadoop command-line tool.
$ hadoop fs -copyFromLocal data.csv /user/
The problem with this approach is what do you do the next time you want to export the data? Do you export the whole batch, or just the changed records? And how do you identify the changed information, and more importantly merge that back once it's in HDFS. More importantly, all you have is the raw data; to use it, for example, within Hive and perform queries on the information, requires manually generating the DDL. Let's try a different tool, Sqoop.
Sqoop is a standard part of Hadoop and uses JDBC to access remote databases of a variety of types and then transfers the information across into Hadoop. The way it does this is actually not vastly different from the manual export process provided above; it runs the equivalent of a 'SELECT * FROM TABLE' and then writes the generated information into HDFS for you.
There are some differences. For one, Sqoop will do this in parallel for you. For example, if you have 20 nodes in your Hadoop cluster, Sqoop will fire up 20 processes that first identify and split up the extraction, and then give each node the range of records to be extracted. When moving millions of rows of data, it is obviously more efficient to be doing this in parallel, providing your server is capable of handling the load.
Using Sqoop is simple, you supply the JDBC connectivity information while logged in to your Hadoop cluster, and effectively suck the data across:
$ sqoop import-all-tables --connect jdbc:mysql://192.168.0.240/cheffy
This process will create a file within HDFS with the extracted data:
$ hadoop fs -ls access_log
Found 6 items
-rw-r--r-- 3 cloudera cloudera 0 2013-08-15 09:37 access_log/_SUCCESS
drwxr-xr-x - cloudera cloudera 0 2013-08-15 09:37 access_log/_logs
-rw-r--r-- 3 cloudera cloudera 36313694 2013-08-15 09:37 access_log/part-m-00000
-rw-r--r-- 3 cloudera cloudera 36442312 2013-08-15 09:37 access_log/part-m-00001
-rw-r--r-- 3 cloudera cloudera 36797470 2013-08-15 09:37 access_log/part-m-00002
-rw-r--r-- 3 cloudera cloudera 36321038 2013-08-15 09:37 access_log/part-m-00003
Sqoop itself is quite flexible, for example you can read data from a variety of sources, and write that out to files in various formats, including generating DDL for use within Hive.
Sqoop also supports incremental loads; this is achieved by either using a known auto-incrementing ID that can be used as the change identifier, or by changing your DDL to provide a date time or other column to help identify the last export and new data. For example, using an auto-increment:
$ sqoop import --connect jdbc:mysql://192.168.0.240/hadoop --username root \
--table chicago --check-column id --incremental append --last-value=2168224
The problem with Sqoop is that not everybody wants to change their DDL or auto-increment data. Meanwhile, the problem with both manual and Sqoop based exports is that performing a query, even a limiting one, has the effect of removing data from your memory cache, which may ultimately affect the performance of the application running on the source database. This is not a situation you want.
Furthermore, automating the process, either with direct imports or Sqoop is not as straightforward as it seems either.
A better solution is to replicate the information in real-time using a tool such as Tungsten Replicator. There are methods built into both MySQL and Oracle for effectively extracting the data:
- MySQL provides the binary log, which contains a simple sequential list of all the changes to the database. These can be statement based or row based, or a mixture.
- Oracle provides multiple tools, but Tungsten Replicator is designed to work the Change Data Capture (CDC) system to extract row-based information from the database changes.
By reading the row-based changes out of the source database, Tungsten Replicator can formulate this information into CSV. A simple diagram explains the basic process.
This effectively replaces both the manual and Sqoop-based processes with an automated, and constant, stream of data from the source database into Hadoop using HDFS. The exact sequence is:
- Data is read from the source database, using the binary log or CDC information. This data is already in row format and each table should have a primary key.
- The row-based changes are stored within the THL format, which consists of the raw ROW data and metadata, such as the column names, primary key and indexing information, and any reformatting required, such as changing the ENUM or SET data types into equivalent strings. THL also associates a unique transaction ID with each block of committed data.
- The THL data is transferred over to the slave replicator, which is running within the Hadoop cluster.
- The slave replicator generates a CSV file per table containing a configurable number of rows or within a specific time limit, providing a regular stream of data. The CSV data itself consists of an operation type (insert, or delete, with updates represented as a delete of the original data and an insert of the new data), the sequence number, the primary key information, and the raw row data itself. The reason for this format is how it is used on the other side.
- The CSV is then copied into the HDFS file system.
This actually only gets the raw change information from the source database and out into HDFS.
This is an important distinction from the manual export and Sqoop processes; Tungsten Replicator effectively stores a CSV version of the change log information.
To turn that change data into we need to materialize the change data into a table that looks like the original table from where the data was generated. The materialization process is actually very simple; within Hadoop we can write a map-reduce script that does the following:
- Orders all the changes by primary key and transaction ID
- Ignores any row that is a delete
- Generates a row for each row marked as an insert, picking the 'last' row (by transaction id) from the list of available rows
This is perhaps clearer in the diagram below, where the change log on the left is translated into the two rows of actual data on the right.
The materialization process needs to be run on every table, and because of the way it works with relation to the idempotency of the primary key information for each row, it can be used both to merge with the current dataset, with data that has previously been provisioned by a using the manual process or Sqoop, and previous executions of the tool on the data. Once the changes have been migrated, the actual change data can be removed.
Using the Change Data
Since you've moved the change data over, another option, rather than generating carbon copy tables, is to actually use the change data. You can, for example, process the information and look at the same transaction data over time or provide a sample of what your data looked like at a specific time. For example, in a sales environment, you might use this to examine the cost and relative sales for products over time when their prices changed.
There are many solutions available for moving data, and indeed getting data into Hadoop is altogether complicated, but there are benefits and pitfalls to the solutions available. Both manual and Sqoop-based solutions tend to be network and resource heavy, and designed to duplicate data from one side to the other.
The right solution needs to provide the ability to analyze the transactional data in a completely different fashion that may provide additional depth and breadth from your existing transactional data store.
Manual via CSV
Possible with DDL changes
Requires DDL changes
Full table scan
Full and partial table scans
Low-impact binlog scan
- Cloud Expo 2011 East To Attract 10,000 Delegates and 200 Exhibitors
- Cloud Expo New York to Attract More Than 8,000 Delegates
- New Release of Quest Toad for Oracle Offers Enhancements to Reduce Risk
- Twelve New Programming Languages: Is Cloud Responsible?
- The Future of Cloud Computing: Industry Predictions for 2012
- Universal Middleware: What's Happening With OSGi and Why You Should Care
- Oracle To Keynote Cloud Computing Expo
- Oracle-Sun: Jonathan Schwartz Writes His Toughest Ever Email
- An Exclusive Interview with Oracle, Cloud Expo 2010 Diamond Sponsor
- Why SOA Is a Good Fit for CRM Solutions