Data Protection for Hadoop Environments

One of the questions I often get asked is do we need data protection for Hadoop environments?

It is an unusual question because most of my customers don’t ask do we need data protection for Oracle, DB2, SAP, Teradata or SQL environments? I think it is safe to say the majority of these environments are always protected.

So I started thinking. What motivates people to question whether they need data protection for Hadoop?

I boiled it down to three themes:

  1. Very few (if any) of the software backup vendors have solutions for Hadoop
  2. Hadoop size and scalability is daunting when you consider most of us come from a background of providing data protection for monolithic environments that are in the 10’s of TB rather than the 100’s or 1000’s of TB for a single system
  3. Hadoop has some inbuilt data protection properties

So if we play it back, we can’t turn to our traditional backup vendors as they don’t have integrated Hadoop solutions. We are inundated by the size and scale of the problem. And so we are left with the hope that Hadoop’s inbuilt data protection properties will be good enough.

Before we dive into answering is Hadoop’s inbuilt data protection properties good enough, I wanted to share some fundamental differences between Hadoop and traditional Enterprise systems. This will help us navigate how we should think about data protection in a Hadoop environment.

Distributed versus Monolithic

Hadoop is a distributed system that spans many servers. Not only is the computing distributed but for the standard Hadoop architecture, storage is distributed as well. Contrast this to a traditional Enterprise application that spans only a few servers and is usually reliant on centralised intelligent storage systems.

From a data protection perspective, which is going to be more challenging?

Protecting a centralised data repository that is addressable by a few servers is arguably simpler than supporting a large collection of servers each owning a portion of the data repository that is captive to the server.

When we are challenged to protect large volumes of data frequently we are forced to resort to infrastructure centric approaches to data protection. This is where we rely on the infrastructure to optimise the flow of data movement between the source of the data undergoing protection and the protection storage system. In many cases we have to resort to intelligent storage or in the case of virtualisation, the hypervisor to provide these optimisations. However, in a standard Hadoop architecture where physical servers with direct attached storage is used, neither of these approaches is available.

To comply with the standard Hadoop architecture, the protection methodology needs to be software-driven and ideally should exploit the distributed properties of the Hadoop architecture so that data protection processing can scale with the environment.

Mixed Workloads

Hadoop systems typically support different applications and business processes, many of which will be unrelated. Contrast this to a traditional Enterprise application consists of a collection of related systems working together to deliver an application (e.g. Customer Relationship Management) that supports one or more business processes. For these Enterprise systems we apply consistent data protection policies across the systems that support the application. We could go one step further and treat all application upstream and downstream interfaces and dependencies with the same levels of protection.

Hadoop environments can support many Enterprise applications and business processes. Each application will vary in importance much like production and non-production system classifications. The importance of Hadoop workloads is a characteristic we should exploit when devising a data protecting strategy. If we fall into the trap of treating everything equal, it gets us back to being overwhelmed by the size and scope of the problem ahead of us.

So, when thinking about data protection for Hadoop environments we should focus our efforts on the applications and corresponding data sets that matter. We may find if we do this the profile of the challenge ahead of us changes from being a 500TB protection problem to a 100TB protection problem using method A and 200TB protection problem using method B and 200TB that does not require additional levels of protection.

Hadoop Inbuilt Data Protection

There are two styles of data protection provided by Hadoop.

Onboard protection is built into the standard Hadoop File System (HDFS). The level of protection provided is confined to an individual Hadoop cluster. If the Hadoop cluster or file system suffers a major malfunction then these methods will likely be compromised.

Offboard protection is primarily concerned with creating copies of files from one Hadoop cluster to another Hadoop cluster or independent storage system.

Onboard Data Protection Features

In a typical Enterprise application we usually rely on the storage system to provide data redundancy both from an access layer (storage controllers) and persistence layer (storage media).

In Hadoop, redundancy at the persistence layer is provided by maintaining N-way local copies (3 by default) of each data block across data nodes. This is called the system-wide replication factor and is universally applied when files are created. The replication factor can also be defined explicitly when files are create and can be modified retrospectively. This flexibility enables redundancy to be aligned to the importance and value of data.

Redundancy at the access layer is provided by the NameNode. The NameNode tracks the location and availability of data blocks and instructs Hadoop clients which data nodes should be used when reading and writing data blocks.

Hadoop provides proactive protection. Many filesystems assume data stays correct after it is written. This is a naive assumption that has been researched extensively. HDFS protects against data corruption by verifying the integrity of data blocks and repairing them from replica copies. This process occurs as a background task periodically and when Hadoop clients read data from HDFS.

HDFS supports snapshots of the entire file system or directories. Nested snapshot directories are not allowed. HDFS Snapshots are read only logical copies of directories and files that preserve the state of persistent data. One unexpected behaviour of HDFS snapshots is they are not atomic. In our testing, HDFS snapshots only preserve consistency on file close. What this means if a snapshot is taken of an open file the resulting snapshot will represent the file data once the open file is closed. This behaviour is at odds with traditional storage and filesystem based snapshot implementations that preserve data at the time of the snapshot regardless of state.

To illustrate this point we created a snapshot (s1) of a closed file then we started appending to the same file and at the same time created another snapshot (s2). Then we compared the size of the original file with the two snapshot versions. The first snapshot did not grow and represented the file data before the append operation. The second snapshot grew after it was created and represented the file data after the append operation completed (i.e. on file close).

Here is the experiment if you want to try it yourself.

Create a directory and enable snapshots.

[root@hadoop ~]# hdfs dfs -mkdir /mydir
[root@hadoop ~]# hdfs dfsadmin -allowSnapshot /mydir
Allowing snaphot on /mydir succeeded

Upload a file to the directory.

[root@hadoop ~]# hdfs dfs -put /boot/vmlinuz–3.10.0–123.el7.x86_64 /mydir
[root@hadoop ~]# hdfs dfs -ls /mydir
Found 1 items
-rw-r–r– 3 root supergroup 4902656 2015–01–07 04:23 /mydir/vmlinuz–3.10.0–123.el7.x86_64

Create a snapshot of the directory. In this case the file in the directory is idle.

root@hadoop ~]# hdfs dfs -ls /mydir
Found 1 items
-rw-r–r– 3 root supergroup 2630536960 2015–01–07 04:27 /mydir/vmlinuz–3.10.0–123.el7.x86_64
[root@hadoop ~]# hdfs dfs -createSnapshot /mydir s1
Created snapshot /mydir/.snapshot/s1
[root@hadoop ~]# hdfs dfs -ls /mydir/.snapshot/s1
Found 1 items
-rw-r–r– 3 root supergroup 2630536960 2015–01–07 04:27 /mydir/.snapshot/s1/vmlinuz–3.10.0–123.el7.x86_64

The size of the original file and snapshot version are the same. This is what we expected.

Append to the original file and in the background create a new snapshot (s2).

root@hadoop ~]# hdfs dfs -appendToFile /vmlinuz–3.10.0–123.el7.x86_64 /mydir/vmlinuz–3.10.0–123.el7.x86_64 &
[1] 20263
[root@hadoop ~]# hdfs dfs -ls /mydir/vmlinuz–3.10.0–123.el7.x86_64
-rw-r–r– 3 root supergroup 2684354560 2015–01–07 04:27 /mydir/vmlinuz–3.10.0–123.el7.x86_64
[root@hadoop ~]# hdfs dfs -createSnapshot /mydir s2
Created snapshot /mydir/.snapshot/s2
[root@hadoop ~]# hdfs dfs -ls /mydir/.snapshot/s2/vmlinuz–3.10.0–123.el7.x86_64
-rw-r–r– 3 root supergroup 3489660928 2015–01–07 04:27 /mydir/.snapshot/s2/vmlinuz–3.10.0–123.el7.x86_64
[root@hadoop ~]# hdfs dfs -ls /mydir/.snapshot/s2/vmlinuz–3.10.0–123.el7.x86_64
-rw-r–r– 3 root supergroup 3892314112 2015–01–07 04:27 /mydir/.snapshot/s2/vmlinuz–3.10.0–123.el7.x86_64
root@hadoop ~]# hdfs dfs -ls /mydir/.snapshot/s1
Found 1 items
-rw-r–r– 3 root supergroup 2630536960 2015–01–07 04:27 /mydir/.snapshot/s1/vmlinuz–3.10.0–123.el7.x86_64

Notice the second snapshot (s2) continues to grow as the file append runs in the background. The first snapshot (s1) does not change.

This demonstrates HDFS snapshots provide consistency only on file close. This may well be how the designers intended them to work. Coming from a storage background it is important to understand this behaviour particularly if HDFS snapshots form part of your data protection strategy.

Hadoop supports the concept of a recycle bin that is implemented by the HDFS Trash feature. When files are deleted using the HDFS client they are moved to a user-level trash folder where they persist for a predefined amount of time. Once the time elapses the file is deleted from the HDFS namespace and the referenced blocks are reclaimed as part of the space reclamation process. Files that have not expired can be moved back into a directory and in this regard can be used to undo a delete operation.

Below is an example of a file that was deleted and subsequently moved to trash.

[hdfs@hdp–1 ~]$ hdfs dfs -rm /tmp/myfile
14/12/01 16:00:55 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 21600000 minutes, Emptier interval = 0 minutes.
Moved: ‘hdfs://hdp–1.mlab.local:8020/tmp/myfile’ to trash at: hdfs://hdp–1.mlab.local:8020/user/hdfs/.Trash/Current

If we want to access the file in trash we can use regular HDFS commands referencing the files trash path.

[hdfs@hdp–1 ~]$ hdfs dfs -ls /user/hdfs/.Trash/Current/tmp
Found 1 items
-rw-r–r– 2 hdfs hdfs 13403947008 2014–11–25 15:39 /user/hdfs/.Trash/Current/tmp/myfile
[hdfs@hdp–1 ~]$ hdfs dfs -mv /user/hdfs/.Trash/Current/tmp/myfile /tmp

There are a few caveats to be aware of:

  • Trash is implemented in the HDFS client only. If we are interfacing with HDFS using a programmatic API the function is not implemented.
  • Use of trash cannot be enforced. A user can bypass trash by specifying the -skipTrash argument to HDFS client.

Offboard Data Protection Features

Hadoop supports copying files in and out of Hadoop clusters using Hadoop Distributed Copy (distcp). This technology is designed to copy large volumes of data either within the same Hadoop cluster, to another Hadoop cluster, Amazon S3 (or S3 compliant object store), Openstack Swift, FTP or NAS storage visible to all Hadoop data nodes. Support for Microsoft Azure has been included in the 2.7.0 release. Refer to HADOOP–9629.

A key benefit of distcp is it uses Hadoop’s parallel processing model (MapReduce) to carry out the work. This is important for large Hadoop clusters as a protection method that does not leverage the distributed nature of Hadoop is bound to hit scalability limits.

One downside of the current distcp implementation is that single file replication performance is bound to one map task and data node respectively.

Why is this a problem?

Each file requiring replication can only consume the networking resources available to the data node running the corresponding map task. For many small files this does not represent a problem, as many map tasks can be used to distribute the workload. However, for very large files which are common to Hadoop environments, performance of individual file copies will be limited by the bandwidth available to individual data nodes. For example, if data nodes are connected via 1 Gbe and the average file size in the cluster was 10TB, then the minimum amount of time required to copy one average file is 22 hours.

To address this situation distcp will need to evolve to support distributed block-level copy which to my knowledge is not currently available. Another option is to compress the files transparently prior to transmission however that is not available either (see HADOOP–8065).

Hadoop Distributed Copy to EMC ViPR (S3 interface)

We previously mentioned it is possible to copy files from Hadoop to S3 compliant object stores. This section will describe how to use distributed copy with EMC ViPR object store which is S3 compliant to the extent required.

S3 support is provided by the JetS3t API (s3 and s3n drivers) or the AWS S3 SDK (s3a driver). The JetS3t API allows the S3 endpoint to be configured to a target other than Amazon Web Services (AWS). To do this we need to set some properties that will get passed to the JetS3t API.

Create a file called jets3t.properties in your hadoop configuration directory. In my case it is /usr/local/hadoop/etc/hadoop.

Set some properties as follows:

s3service.s3-endpoint=vipr-ds1.mlab.local
s3service.s3-endpoint-http-port=9020
s3service.s3-endpoint-https-port=9021
s3service.disable-dns-buckets=true
s3service.https-only=false

Here we set the s3 endpoint to our ViPR data store (if you have multiple data stores point this at your load balancer) and set the ports for HTTP and HTTPS access that ViPR listens on. We disabled the use of DNS to resolve bucket names since we did not setup wildcard DNS for S3 bucket naming. We also disabled HTTPS as it requires real certificates rather than self signed to function.

Also in the same directory is the Hadoop core-site.xml. Set the following properties in this file:

<property>
 <name>fs.s3.awsAccessKeyId</name>
 <value>root</value>
</property>
<property>
 <name>fs.s3n.awsAccessKeyId</name>
 <value>root</value>
</property>
<property>
 <name>fs.s3n.awsSecretAccessKey</name>
 <value>6U08BuxfB6DTWPMzUU7tgfXDGBzXMEu/DLBG++5m</value>
</property>
<property>
 <name>fs.s3.awsSecretAccessKey</name>
 <value>6U08BuxfB6DTWPMzUU7tgfXDGBzXMEu/DLBG++5m</value>
</property>

The access key id and secret access key are obtained from ViPR and defined when the ViPR tenant and bucket are setup.

Here are a few screenshots of the ViPR bucket setup and object access key.
title
title

We also define the java classes the Hadoop client should call when S3 URI’s are used. Add the following properties to core-site.xml:

<property>
 <name>fs.s3.impl</name>
 <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
</property>
<property>
 <name>fs.s3n.impl</name>
 <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
<property>
 <name>fs.AbstractFileSystem.s3n.impl</name>
 <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
<property>
 <name>fs.AbstractFileSystem.s3.impl</name>
 <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
</property>

Ensure both the core-site.xml and jets3t.properties files are copied to all Hadoop data nodes.

We now need to add the S3 jar files to the Hadoop classpath. One way to do this is to link the jar files into a directory included in the Hadoop classpath.

To view the classpath use:

[root@hadoop ~]# hadoop classpath
/usr/local/hadoop/etc/hadoop:/usr/local/hadoop/share/hadoop/common/lib/:/usr/local/hadoop/share/hadoop/common/:/usr/local/hadoop/share/hadoop/hdfs:/usr/local/hadoop/share/hadoop/hdfs/lib/:/usr/local/hadoop/share/hadoop/hdfs/:/usr/local/hadoop/share/hadoop/yarn/lib/:/usr/local/hadoop/share/hadoop/yarn/:/usr/local/hadoop/share/hadoop/mapreduce/lib/:/usr/local/hadoop/share/hadoop/mapreduce/:/usr/local/hadoop/contrib/capacity-scheduler/*.jar

Symbolic link the jar files into one of these directories.

[root@hadoop ~]# cd /usr/local/hadoop/share/hadoop/hdfs/
[root@hadoop hdfs]# ln -s ../tools/lib/hadoop-aws–2.6.0.jar
[root@hadoop hdfs]# ln -s ../tools/lib/aws-java-sdk–1.7.4.jar

Now we can start using the ViPR object store as both a source for Hadoop processing and a target for distcp.

Here is how we copy a file from HDFS to the ViPR object store bucket named hadoopbackups.

[root@hadoop ~]# hdfs dfs -put /boot/vmlinuz–3.10.0–123.el7.x86_64 s3n://hadoopbackups/
14/12/30 00:55:05 INFO s3native.NativeS3FileSystem: OutputStream for key ‘vmlinuz–3.10.0–123.el7.x86_64.COPYING’ writing to tempfile ‘/tmp/hadoop-root/s3/output–4123538761754760460.tmp’
14/12/30 00:55:06 INFO s3native.NativeS3FileSystem: OutputStream for key ‘vmlinuz–3.10.0–123.el7.x86_64.COPYING’ closed. Now beginning upload
14/12/30 00:55:06 INFO s3native.NativeS3FileSystem: OutputStream for key ‘vmlinuz–3.10.0–123.el7.x86_64.COPYING’ upload complete

Here is how to list the contents of the ViPR object store bucket.

[root@hadoop ~]# hdfs dfs -ls s3n://hadoopbackups/
Found 1 items

Here is how to copy a directory using distcp from HDFS to the ViPR object store bucket.

[root@hadoop ~]# hadoop -pt -update -delete -append distcp hdfs://hadoop:9000/files/ s3n://hadoopbackups/files
14/12/30 23:17:56 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=true, deleteMissing=true, ignoreFailures=false, maxMaps=20, sslConfigurationFile=’null‘, copyStrategy=’uniformsize’, sourceFileListing=null, sourcePaths=[hdfs://hadoop:9000/files], targetPath=s3n://hadoopbackups/files, targetPathExists=false, preserveRawXattrs=false}
14/12/30 23:17:56 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/30 23:17:57 INFO Configuration.deprecation: io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb
14/12/30 23:17:57 INFO Configuration.deprecation: io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor
14/12/30 23:17:57 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/30 23:17:58 INFO mapreduce.JobSubmitter: number of splits:10
14/12/30 23:17:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1419937768824_0002
14/12/30 23:17:58 INFO impl.YarnClientImpl: Submitted application application_1419937768824_0002
14/12/30 23:17:58 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1419937768824_0002/
14/12/30 23:17:58 INFO tools.DistCp: DistCp job-id: job_1419937768824_0002
14/12/30 23:17:58 INFO mapreduce.Job: Running job: job_1419937768824_0002
14/12/30 23:18:05 INFO mapreduce.Job: Job job_1419937768824_0002 running in uber mode : false
14/12/30 23:18:05 INFO mapreduce.Job: map 0% reduce 0%
14/12/30 23:18:20 INFO mapreduce.Job: map 50% reduce 0%
14/12/30 23:18:21 INFO mapreduce.Job: map 60% reduce 0%
14/12/30 23:18:22 INFO mapreduce.Job: map 70% reduce 0%
14/12/30 23:18:24 INFO mapreduce.Job: map 100% reduce 0%
14/12/30 23:18:25 INFO mapreduce.Job: Job job_1419937768824_0002 completed successfully
14/12/30 23:18:25 INFO mapreduce.Job: Counters: 38
 File System Counters
 FILE: Number of bytes read=0
 FILE: Number of bytes written=1110540
 FILE: Number of read operations=0
 FILE: Number of large read operations=0
 FILE: Number of write operations=0
 HDFS: Number of bytes read=44129667
 HDFS: Number of bytes written=0
 HDFS: Number of read operations=97
 HDFS: Number of large read operations=0
 HDFS: Number of write operations=20
 S3N: Number of bytes read=0
 S3N: Number of bytes written=44123904
 S3N: Number of read operations=0
 S3N: Number of large read operations=0
 S3N: Number of write operations=0
 Job Counters
 Launched map tasks=10
 Other local map tasks=10
 Total time spent by all maps in occupied slots (ms)=131490
 Total time spent by all reduces in occupied slots (ms)=0
 Total time spent by all map tasks (ms)=131490
 Total vcore-seconds taken by all map tasks=131490
 Total megabyte-seconds taken by all map tasks=134645760
 Map-Reduce Framework
 Map input records=10
 Map output records=0
 Input split bytes=1340
 Spilled Records=0
 Failed Shuffles=0
 Merged Map outputs=0
 GC time elapsed (ms)=6263
 CPU time spent (ms)=23840
 Physical memory (bytes) snapshot=1677352960
 Virtual memory (bytes) snapshot=21159137280
 Total committed heap usage (bytes)=1164443648
 File Input Format Counters
 Bytes Read=4423
 File Output Format Counters
 Bytes Written=0
 org.apache.hadoop.tools.mapred.CopyMapper$Counter
 BYTESCOPIED=44123904
 BYTESEXPECTED=44123904
 COPY=10

We can analyse the distcp job in more detail by browsing the job history. Here we can see the job was split into 10 map tasks (one file per map task).

title

We can see the duration of each map task.

title

We can review various counters. The most interesting is the number of bytes copied and bytes expected.

title

These counters can also be viewed with the command line.

[root@hadoop ~]# mapred job -counter job_1419937768824_0003 ‘org.apache.hadoop.tools.mapred.CopyMapper$Counter’ BYTESCOPIED
14/12/30 23:21:07 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/30 23:21:08 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
44123904
[root@hadoop ~]# mapred job -counter job_1419937768824_0003 ‘org.apache.hadoop.tools.mapred.CopyMapper$Counter’ BYTESEXPECTED
14/12/30 23:21:16 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/30 23:21:17 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
44123904

We can also copy files from the ViPR object store bucket to HDFS.

[root@hadoop ~]# hadoop -pt -update -delete -append distcp s3n://hadoopbackups/files/ hdfs://hadoop:9000/files2
14/12/30 23:18:54 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=true, deleteMissing=true, ignoreFailures=false, maxMaps=20, sslConfigurationFile=’null‘, copyStrategy=’uniformsize’, sourceFileListing=null, sourcePaths=[s3n://hadoopbackups/files], targetPath=hdfs://hadoop:9000/files2, targetPathExists=false, preserveRawXattrs=false}
14/12/30 23:18:54 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/30 23:18:56 INFO Configuration.deprecation: io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb
14/12/30 23:18:56 INFO Configuration.deprecation: io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor
14/12/30 23:18:56 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/30 23:18:57 INFO mapreduce.JobSubmitter: number of splits:10
14/12/30 23:18:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1419937768824_0003
14/12/30 23:18:57 INFO impl.YarnClientImpl: Submitted application application_1419937768824_0003
14/12/30 23:18:57 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1419937768824_0003/
14/12/30 23:18:57 INFO tools.DistCp: DistCp job-id: job_1419937768824_0003
14/12/30 23:18:57 INFO mapreduce.Job: Running job: job_1419937768824_0003
14/12/30 23:19:03 INFO mapreduce.Job: Job job_1419937768824_0003 running in uber mode : false
14/12/30 23:19:04 INFO mapreduce.Job: map 0% reduce 0%
14/12/30 23:19:11 INFO mapreduce.Job: map 10% reduce 0%
14/12/30 23:19:14 INFO mapreduce.Job: map 50% reduce 0%
14/12/30 23:19:15 INFO mapreduce.Job: map 60% reduce 0%
14/12/30 23:19:16 INFO mapreduce.Job: map 70% reduce 0%
14/12/30 23:19:17 INFO mapreduce.Job: map 80% reduce 0%
14/12/30 23:19:19 INFO mapreduce.Job: map 90% reduce 0%
14/12/30 23:19:20 INFO mapreduce.Job: map 100% reduce 0%
14/12/30 23:19:20 INFO mapreduce.Job: Job job_1419937768824_0003 completed successfully
14/12/30 23:19:20 INFO mapreduce.Job: Counters: 38
 File System Counters
 FILE: Number of bytes read=0
 FILE: Number of bytes written=1110540
 FILE: Number of read operations=0
 FILE: Number of large read operations=0
 FILE: Number of write operations=0
 HDFS: Number of bytes read=5516
 HDFS: Number of bytes written=44123904
 HDFS: Number of read operations=143
 HDFS: Number of large read operations=0
 HDFS: Number of write operations=39
 S3N: Number of bytes read=44123904
 S3N: Number of bytes written=0
 S3N: Number of read operations=0
 S3N: Number of large read operations=0
 S3N: Number of write operations=0
 Job Counters
 Launched map tasks=10
 Other local map tasks=10
 Total time spent by all maps in occupied slots (ms)=88546
 Total time spent by all reduces in occupied slots (ms)=0
 Total time spent by all map tasks (ms)=88546
 Total vcore-seconds taken by all map tasks=88546
 Total megabyte-seconds taken by all map tasks=90671104
 Map-Reduce Framework
 Map input records=10
 Map output records=0
 Input split bytes=1340
 Spilled Records=0
 Failed Shuffles=0
 Merged Map outputs=0
 GC time elapsed (ms)=3352
 CPU time spent (ms)=21580
 Physical memory (bytes) snapshot=1673371648
 Virtual memory (bytes) snapshot=21154611200
 Total committed heap usage (bytes)=1137180672
 File Input Format Counters
 Bytes Read=4176
 File Output Format Counters
 Bytes Written=0
 org.apache.hadoop.tools.mapred.CopyMapper$Counter
 BYTESCOPIED=44123904
 BYTESEXPECTED=44123904
 COPY=10

This job is also distributed with multiple map tasks copying files from the ViPR object store to HDFS.

We can verify this by looking at the successful map attempts and in particular the status and node columns. These show the copy tasks were distributed amongst Hadoop data nodes.

title

We can also list the files in the ViPR object store bucket. Keep in mind the command does not represent S3 folders the way you might expect (look at the files directory below).

[root@hadoop ~]# hdfs dfs -ls s3n://hadoopbackups/
Found 10 items
drwxrwxrwx - 0 1970–01–01 10:00 s3n://hadoopbackups/files
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_9

If we copy the files directory again to files2 things start to look confusing.

[root@hadoop ~]# hdfs dfs -ls s3n://hadoopbackups/
Found 20 items
drwxrwxrwx - 0 1970–01–01 10:00 s3n://hadoopbackups/files
drwxrwxrwx - 0 1970–01–01 10:00 s3n://hadoopbackups/files2
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-rw-rw- 1 4902656 2014–12–30 23:19 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_9
-rw-rw-rw- 1 4902656 2014–12–30 23:26 s3n://hadoopbackups/vmlinuz–3.10.0–123.el7.x86_64_9

However if we browse the bucket using s3fs (which I installed previously) all is well.

[root@hadoop ~]# s3fs -o url=http://vipr-ds1.mlab.local:9020 -o passwd_file=/usr/local/etc/passwd-s3fs hadoopbackups /mnt
[root@hadoop ~]# ls /mnt
files files2
[root@hadoop ~]# ls /mnt/files
vmlinuz–3.10.0–123.el7.x86_64_1 vmlinuz–3.10.0–123.el7.x86_64_3 vmlinuz–3.10.0–123.el7.x86_64_5 vmlinuz–3.10.0–123.el7.x86_64_7 vmlinuz–3.10.0–123.el7.x86_64_9
vmlinuz–3.10.0–123.el7.x86_64_2 vmlinuz–3.10.0–123.el7.x86_64_4 vmlinuz–3.10.0–123.el7.x86_64_6 vmlinuz–3.10.0–123.el7.x86_64_8
[root@hadoop ~]# ls /mnt/files2
vmlinuz–3.10.0–123.el7.x86_64_1 vmlinuz–3.10.0–123.el7.x86_64_3 vmlinuz–3.10.0–123.el7.x86_64_5 vmlinuz–3.10.0–123.el7.x86_64_7 vmlinuz–3.10.0–123.el7.x86_64_9
vmlinuz–3.10.0–123.el7.x86_64_2 vmlinuz–3.10.0–123.el7.x86_64_4 vmlinuz–3.10.0–123.el7.x86_64_6 vmlinuz–3.10.0–123.el7.x86_64_8

Note1: keep this in mind when using hdfs dfs -ls against an S3 bucket
Note2: when copying directories from hdfs to S3 ensure there are no nested directories as we observed distcp will fail to copy the structure back into HDFS from the parent S3 folder

We can also delete files from the ViPR object store bucket.

[root@hadoop ~]# hdfs dfs -rm -r -skipTrash s3n://hadoopbackups/files
Deleted s3n://hadoopbackups/files

Note: we need to specify the skipTrash flag otherwise the HDFS client will create a trash folder on the object store and move the files instead of deleting them

Hadoop Distributed Copy to EMC Data Domain (as NFS target)

In addition to S3 we can just as easily create copies of HDFS files onto Data Domain Protection Storage for Backup and Archive .

To accomplish this we must first export an NFS share from the Data Domain system. For this experiment we created a Data Domain mtree called hadoop and exported it to the three Hadoop data nodes.

title

We then mounted the share on the three data nodes under /dd.

[root@hadoop ~]# ssh hadoop mount ddve–03.mlab.local:/data/col1/hadoop /dd
[root@hadoop ~]# ssh hadoop–1 mount ddve–03.mlab.local:/data/col1/hadoop /dd
[root@hadoop ~]# ssh hadoop–2 mount ddve–03.mlab.local:/data/col1/hadoop /dd

Now we can copy files from HDFS to Data Domain as follows:

[root@hadoop ~]# hadoop distcp /files file:///dd/
14/12/31 03:45:47 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile=’null‘, copyStrategy=’uniformsize’, sourceFileListing=null, sourcePaths=[/files], targetPath=file:/dd, targetPathExists=true, preserveRawXattrs=false}
14/12/31 03:45:47 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/31 03:45:52 INFO Configuration.deprecation: io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb
14/12/31 03:45:52 INFO Configuration.deprecation: io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor
14/12/31 03:45:52 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
14/12/31 03:45:53 INFO mapreduce.JobSubmitter: number of splits:10
14/12/31 03:45:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1419937768824_0007
14/12/31 03:45:54 INFO impl.YarnClientImpl: Submitted application application_1419937768824_0007
14/12/31 03:45:55 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1419937768824_0007/
14/12/31 03:45:55 INFO tools.DistCp: DistCp job-id: job_1419937768824_0007
14/12/31 03:45:55 INFO mapreduce.Job: Running job: job_1419937768824_0007
14/12/31 03:46:03 INFO mapreduce.Job: Job job_1419937768824_0007 running in uber mode : false
14/12/31 03:46:03 INFO mapreduce.Job: map 0% reduce 0%
14/12/31 03:46:16 INFO mapreduce.Job: map 20% reduce 0%
14/12/31 03:46:17 INFO mapreduce.Job: map 30% reduce 0%
14/12/31 03:46:20 INFO mapreduce.Job: map 50% reduce 0%
14/12/31 03:46:21 INFO mapreduce.Job: map 60% reduce 0%
14/12/31 03:46:22 INFO mapreduce.Job: map 70% reduce 0%
14/12/31 03:46:36 INFO mapreduce.Job: map 100% reduce 0%
14/12/31 03:46:37 INFO mapreduce.Job: Job job_1419937768824_0007 completed successfully
14/12/31 03:46:37 INFO mapreduce.Job: Counters: 33
 File System Counters
 FILE: Number of bytes read=0
 FILE: Number of bytes written=45215734
 FILE: Number of read operations=0
 FILE: Number of large read operations=0
 FILE: Number of write operations=0
 HDFS: Number of bytes read=44129771
 HDFS: Number of bytes written=0
 HDFS: Number of read operations=97
 HDFS: Number of large read operations=0
 HDFS: Number of write operations=20
 Job Counters
 Launched map tasks=10
 Other local map tasks=10
 Total time spent by all maps in occupied slots (ms)=182909
 Total time spent by all reduces in occupied slots (ms)=0
 Total time spent by all map tasks (ms)=182909
 Total vcore-seconds taken by all map tasks=182909
 Total megabyte-seconds taken by all map tasks=187298816
 Map-Reduce Framework
 Map input records=10
 Map output records=0
 Input split bytes=1330
 Spilled Records=0
 Failed Shuffles=0
 Merged Map outputs=0
 GC time elapsed (ms)=5525
 CPU time spent (ms)=16300
 Physical memory (bytes) snapshot=1595691008
 Virtual memory (bytes) snapshot=21133045760
 Total committed heap usage (bytes)=1116733440
 File Input Format Counters
 Bytes Read=4537
 File Output Format Counters
 Bytes Written=0
 org.apache.hadoop.tools.mapred.CopyMapper$Counter
 BYTESCOPIED=44123904
 BYTESEXPECTED=44123904
 COPY=10

We can list the Data Domain contents.

[root@hadoop ~]# hdfs dfs -ls file:///dd/files
Found 9 items
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-r–r– 1 root root 4902656 2014–12–31 03:46 file:///dd/files/vmlinuz–3.10.0–123.el7.x86_64_9

We can also protect the file copies on Data Domain using the retention lock feature. Retention lock is used to prevent files from being deleted until a certain date and time. This technique is commonly used to comply with data retention regulations but can also be used to provide enhanced levels of protection from human error and malware which for this use case is important because the protection copies are directly exposed to the system being protected.

Here is an example of using retention lock. Set the access time of the file to a date in the future.

[root@hadoop ~]# date
Wed Dec 31 12:02:34 EST 2014
[root@hadoop ~]# touch -a -t 201501011000 /dd/files/vmlinuz–3.10.0–123.el7.x86_64_1
[root@hadoop ~]# ls -lu /dd/files/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-r–r– 1 root root 4902656 Jan 1 2015 /dd/files/vmlinuz–3.10.0–123.el7.x86_64_1

Then try and delete it.

[root@hadoop ~]# rm -f /dd/files/vmlinuz–3.10.0–123.el7.x86_64_1
rm: cannot remove ‘/dd/files/vmlinuz–3.10.0–123.el7.x86_64_1’: Permission denied

Retention lock prevents the file from being deleted.

Note: retention lock must be licensed and enabled on the Data Domain mtree that we are using to store protection copies.

Data Domain also supports a feature called fast copy. This enables the Data Domain file system namespace to be duplicated to different areas on the same Data Domain system. The duplication process is almost instantaneous and creates a copy of the pointers representing the directories and files we are interested in copying. The copy consumes no additional space as the data remains deduplicated against the original.

The fast copy technology can be used to support an incremental forever protection strategy for Hadoop data sets with versioning enabled. Let’s demonstrate how this works in practice.

Let’s say we want to create weekly versioned copies of the HDFS /files directory on a Data Domain system.

First we create a copy of the initial version using distcp to the Data Domain mtree that we have mounted under /dd.

The first version will be copied to a directory based on the current timestamp syntax YYYYMMDD. In this example we will be using the date command with backticks to evaluate the current system date in the command line.

[root@hadoop ~]# hadoop distcp -pt -update -delete -append /files file:///dd/`date +%Y%m%d`

We can see the files were copied as expected.

[root@hadoop ~]# hdfs dfs -ls file:///dd/`date +%Y%m%d`
Found 9 items
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/Monday/vmlinuz–3.10.0–123.el7.x86_64_9

Immediately after creating the copy on Data Domain we want to protect it further by taking a fast copy to an mtree that is not visible to Hadoop. To do this we need to have a second mtree setup on the Data Domain system (we created /data/col1/hadoop_fc ahead of time) that we fast copy into.

To run the fast copy command we need to have ssh keys setup between the controlling host and Data Domain. This was setup ahead of time for a user called hadoop. Keep in mind this entire process does not need to run from a Hadoop node. It should be run from an independent trusted secure host that has access to Hadoop and Data Domain.

Lets fast copy the current version from the hadoop mtree to the hadoop_fc mtree for added protection.

[root@hadoop ~]# ssh hadoop@ddve–03 filesys fastcopy source /data/col1/hadoop/`date +%Y%m%d` destination /data/col1/hadoop_fc/
Data Domain OS
(00:00) Waiting for fastcopy to complete…
Fastcopy status: fastcopy /data/col1/hadoop/20141231 to /data/col1/hadoop_fc/20141231: copied 9 files, 1 directory in 0.10 seconds

Now a week passes and we want to preserve todays version of the HDFS /files directory. To demonstrate differences between the two versions lets first delete a file from HDFS to show how this approach enables versioning.

[root@hadoop ~]# hdfs dfs -rm /files/vmlinuz–3.10.0–123.el7.x86_64_1
14/12/31 13:09:40 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 3600 minutes, Emptier interval = 3600 minutes.
Moved: ‘hdfs://hadoop:9000/files/vmlinuz–3.10.0–123.el7.x86_64_1’ to trash at: hdfs://hadoop:9000/user/root/.Trash/Current

Now we have 8 files in HDFS /files.

Before we create our weekly protection copy we want to fast copy the previous weeks copy into todays. This way when we distcp todays version of HDFS /files it should recognise the 8 files already exist and then all it needs to do is delete the one file that no longer exists (the vmlinuz–3.10.0–123.el7.x86_64_1 file we deleted from HDFS).

Note: for this to work I set the system clock forward 7 days

First fast copy last weeks copy into todays copy.

[root@hadoop ~]# ssh hadoop@ddve–03 filesys fastcopy source /data/col1/hadoop/`date +%Y%m%d -d '7 days ago'` destination /data/col1/hadoop/`date +%Y%m%d`
Data Domain OS
Fastcopy status: fastcopy /data/col1/hadoop/20141231 to /data/col1/hadoop/20150107: copied 9 files, 1 directory in 0.03 seconds

Now this weeks copy looks identical to last weeks. That’s what we want as our starting position for each subsequent version.

[root@hadoop ~]# hdfs dfs -ls file:///dd/`date +%Y%m%d -d '7 days ago'` file:///dd/`date +%Y%m%d`
Found 9 items
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20141231/vmlinuz–3.10.0–123.el7.x86_64_9
Found 9 items
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_1
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_9

Now we need to synchronise the current HDFS version of /files with this weeks copy on the Data Domain. First lets look at our HDFS version.

[root@hadoop ~]# hdfs dfs -ls /files
Found 8 items
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_9

Now lets run distcp to synchronise this weeks copy.

[root@hadoop ~]# hadoop distcp -pt -update -delete -append /files file:///dd/`date +%Y%m%d`
15/01/07 00:04:31 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=true, deleteMissing=true, ignoreFailures=false, maxMaps=20, sslConfigurationFile=’null‘, copyStrategy=’uniformsize’, sourceFileListing=null, sourcePaths=[/files], targetPath=file:/dd/20150107, targetPathExists=true, preserveRawXattrs=false}
15/01/07 00:04:31 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
15/01/07 00:04:32 INFO Configuration.deprecation: io.sort.mb is deprecated. Instead, use mapreduce.task.io.sort.mb
15/01/07 00:04:32 INFO Configuration.deprecation: io.sort.factor is deprecated. Instead, use mapreduce.task.io.sort.factor
15/01/07 00:04:32 INFO client.RMProxy: Connecting to ResourceManager at hadoop/192.168.0.94:8032
15/01/07 00:04:33 INFO mapreduce.JobSubmitter: number of splits:8
15/01/07 00:04:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1419937768824_0027
15/01/07 00:04:33 INFO impl.YarnClientImpl: Submitted application application_1419937768824_0027
15/01/07 00:04:33 INFO mapreduce.Job: The url to track the job: http://hadoop:8088/proxy/application_1419937768824_0027/
15/01/07 00:04:33 INFO tools.DistCp: DistCp job-id: job_1419937768824_0027
15/01/07 00:04:33 INFO mapreduce.Job: Running job: job_1419937768824_0027
15/01/07 00:04:39 INFO mapreduce.Job: Job job_1419937768824_0027 running in uber mode : false
15/01/07 00:04:39 INFO mapreduce.Job: map 0% reduce 0%
15/01/07 00:04:46 INFO mapreduce.Job: map 50% reduce 0%
15/01/07 00:04:47 INFO mapreduce.Job: map 75% reduce 0%
15/01/07 00:04:48 INFO mapreduce.Job: map 100% reduce 0%
15/01/07 00:04:49 INFO mapreduce.Job: Job job_1419937768824_0027 completed successfully
15/01/07 00:04:50 INFO mapreduce.Job: Counters: 32
 File System Counters
 FILE: Number of bytes read=0
 FILE: Number of bytes written=874648
 FILE: Number of read operations=0
 FILE: Number of large read operations=0
 FILE: Number of write operations=0
 HDFS: Number of bytes read=4631
 HDFS: Number of bytes written=504
 HDFS: Number of read operations=64
 HDFS: Number of large read operations=0
 HDFS: Number of write operations=16
 Job Counters
 Launched map tasks=8
 Other local map tasks=8
 Total time spent by all maps in occupied slots (ms)=39324
 Total time spent by all reduces in occupied slots (ms)=0
 Total time spent by all map tasks (ms)=39324
 Total vcore-seconds taken by all map tasks=39324
 Total megabyte-seconds taken by all map tasks=40267776
 Map-Reduce Framework
 Map input records=8
 Map output records=8
 Input split bytes=1080
 Spilled Records=0
 Failed Shuffles=0
 Merged Map outputs=0
 GC time elapsed (ms)=935
 CPU time spent (ms)=8660
 Physical memory (bytes) snapshot=1314115584
 Virtual memory (bytes) snapshot=16911384576
 Total committed heap usage (bytes)=931659776
 File Input Format Counters
 Bytes Read=3551
 File Output Format Counters
 Bytes Written=504
 org.apache.hadoop.tools.mapred.CopyMapper$Counter
 BYTESSKIPPED=39221248
 SKIP=8

As we expected distcp synchronised the current HDFS version of /files with this weeks protection copy. To achieve that it did very little work since the distcp target was pre-populated from the previous version using Data Domain fast copy.

We can verify by comparing HDFS /files with the current version on the Data Domain.

[root@hadoop ~]# hdfs dfs -ls /files file:///dd/`date +%Y%m%d`
Found 8 items
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-r–r– 3 root supergroup 4902656 2014–12–30 23:16 /files/vmlinuz–3.10.0–123.el7.x86_64_9
Found 8 items
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_2
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_3
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_4
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_5
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_6
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_7
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_8
-rw-r–r– 1 root root 4902656 2014–12–30 23:16 file:///dd/20150107/vmlinuz–3.10.0–123.el7.x86_64_9

We can continue this process indefinitely and as a background process cleanup versions that we no longer require, remembering to cleanup the versions from both the hadoop and hadoop_fc mtrees.

This approach to Hadoop data protection has some interesting benefits.

  • Produces an incremental forever approach (at the file level) that should scale quite well.
  • Leverages distributed processing (distcp) to drive the creation of protection copies.
  • The protection process can be independent of Hadoop so that human error and malware are less likely to propagate to the protection process and copies i.e. delineation of controls.
  • All the protection copies are deduplicated against each other and stored efficiently for versioning of large data sets.
  • The protection copies are stored in their native format and readily accessible directly from Hadoop for processing without requiring special access, tools or having to restore them back into HDFS first.
  • The protection copies can be replicated between Data Domain systems for efficient offsite protection.

One other possibility that we have yet to exploit is distributing the deduplication process. In the current strategy Data Domain is required to perform the deduplication processing on behalf of the Hadoop data nodes.

title

What would make a great deal of sense is if we could distribute the deduplication processing to the Hadoop data nodes. That would allow the protection strategy to scale uniformly. Data Domain Boost is designed to do exactly that. It is a software development kit (SDK) that allows deduplication processing to be embedded in the data sources (applications).

title

As of this time the SDK is not in the public domain and there is no integration with distcp. If you think this would be a worthwhile development please leave a comment.

Vendor specific Data Protection

Thus far we have touched on the data protection concepts and methods implemented with the standard Apache Hadoop distribution. Now lets look at how each vendors Hadoop distribution provides protection capabilities unique to that distribution.

The Cloudera CDH distribution provides a Backup and Disaster Recovery solution based on a hardened version of Hadoop distributed copy (distcp). This solution requires a second Hadoop cluster to be established as a replication target.

The HortonWorks distribution comes with Apache Falcon. This provides a data management and processing framework that incorporates governance and regulatory compliance features.

The Pivotal HD distribution provides specific data protection tools for HAWQ the SQL query engine running on top of Hadoop.

The MapR distribution has taken a different approach to addressing protection. They developed a proprietary filesystem which is API-compatible with HDFS. This is a major shift from the standard Apache distribution and provides a rich set of data services that can be used for protection including support for atomic snapshots.

There are also a number of tool vendors that can be used for Hadoop protection.

WANdisco is an extension built on top of Apache Hadoop or any of the supported distribution partners and is designed to stretch a Hadoop cluster across sites providing a non-stop architecture. The WANdisco solution does not appear to address versioning which I consider a core requirement of a data protection solution.

A number of other tools exist from Syncsort and Attunity to move data in and out of Hadoop.

If I have omitted any others feel free to leave a comment.

Apart from software-derived solutions there are converged solutions such as those offered by EMC where we have Pivotal HD, Cloudera CDH or Hortonworks running on Isilon OneFS which supports HDFS natively. In these architectures protection can be provided by the Isilon intelligent storage infrastructure. In this case Isilon OneFS supports snapshots which can be replicated and/or rolled over (at a file or direction level) to alternative storage for data protection and long term preservation.

Application Protection rather than File Protection

There is a significant caveat with all data protection methods we have discussed to date. We have discussed how to protect data stored in HDFS. What we have not considered is the applications layered on top of Hadoop.

HDFS file recovery does not constitute application recovery. In order to provide application recovery we need to consider which files require protection and whether the application can be put into a consistent state at the time the protection copy is created. If we cannot guarantee consistency then we need to consider whether the application can be brought into a consistent state after recovery from an inconsistent copy.

Application consistency is usually a property of the application. For example, transaction processing applications supported by relational databases provide consistency through ACID (Atomicity, Consistency, Isolation, Durability) properties. As part of the protection method we would call on the ACID properties of the application to create a consistent view in preparation for the copy process. If the application does not provide ACID properties then we rely on a volume, file system or storage-based function that allows IO to be momentarily frozen so that an atomic point-in-time view of an object (or group of objects) can be taken.

In the case of the standard Apache Hadoop architecture we would expect HDFS snapshots to provide consistent views of data sets, however as we demonstrated in practice this is not possible. And since we don’t have the luxury of intelligent storage we are at the mercy of the application to provide hooks that allow us to take consistent point-in-time views of application data sets. Without this functionality we would need to resort to archaic practices analogous to cold backups which is simply not feasible at scale (i.e. take application offline during HDFS snapshot operation).

HBase is one example of an application layered on top of Hadoop. This technology introduces the concept of tables to Hadoop. HBase includes its own snapshot implementation called HBase snapshots. This is necessary as only HBase understands the underlying structure, relationships and states of the data sets it layers on top of HDFS. However, even HBase snapshots are not perfect. The Apache HBase Operational Management guide includes the following warning:

There is no way to determine or predict whether a very concurrent insert or update will be included in a given snapshot, whether flushing is enabled or disabled. A snapshot is only a representation of a table during a window of time. The amount of time the snapshot operation will take to reach each Region Server may vary from a few seconds to a minute, depending on the resource load and speed of the hardware or network, among other factors. There is also no way to know whether a given insert or update is in memory or has been flushed.

While HBase understands how to snapshot a table it cannot guarantee consistency. Whether this presents a problem for you will depend on your workflow. If atomicity and consistency is important then some synchronisation with application workflows will be necessary to provide consistent recovery positions and versioning of application workload.

What are we Protecting against anyway?

I think we have exhausted the data protection options available to Hadoop environments. What I would like to end with is to help you answer the question are Hadoop’s inbuilt protection properties good enough?

When we think about data protection we usually think in terms of recovery point objective and recovery time objective. What we rarely consider are the events we are protecting against and whether the protection methods and controls we employ are effective against those events.

Whilst it is not my intention to raise fear and doubt it is important to acknowledge there is no such thing as software that doesn’t fail unexpectedly. HDFS has had a very good track record of avoiding data loss. However, despite this record, software bugs that can lead to data loss still exist (see HDFS–5042). The research community has also weighed in on the topic with a paper titled HARDFS: Hardening HDFS with Selective and Lightweight Versioning. It is worth reading if you want to understand what can go wrong.

To help you answer the question is Hadoop’s inbuilt protection properties good enough? we need to explore the events that can lead to data loss and then assess how Hadoop’s inbuilt protection properties would fair in each case.

Below is a list of the events I come across in the Enterprise that can result in data loss.

  • Data corruption– usually a by product of hardware and/or software failure. An example may be physical decay of the underlying storage media which is sometimes referred to as bit rot.
  • Site failure– analogous to a disaster event that renders the site inoperable. An example may be a building fire, systematic software failure or malware event.
  • System failure– individual server, storage or network component failure. An example may be a motherboard or disk failure.
  • Operational failure– human error resulting from the day to day operations of the environment. An example may be an operator re-initialising the wrong system and erasing data.
  • Application software failure– failures of the application software layered on top of the infrastructure. An example may be a coding bug.
  • Infrastructure software failure– software failures relating to the infrastructure supporting the applications. An example may be disk firmware or controller software bug.
  • Accidental user event– accidental user-driven acts that require data recovery. An example may be a user deleting the wrong file.
  • Malicious user event– intentional user-driven acts designed to harm the system. An example may be a user re-initialising a file system.
  • Malware event– automated systems designed to penetrate and harm the system. An example was the recent Syncology worm that encrypted the data stored on Syncology NAS devices and demanded payment in exchange for the encryption key.

Associated with each event is the probability of occurrence. Probability is implicated by many factors, environment, technology, people, processes, policies and experience. Each of these may help or hinder our ability to counter the risk of these events occurring. It would be unfair to try and generalise the probability of these events occurring in your environment (that is left up to the reader) so for the purpose of comparison I have assumed all events are equally probable and rated Hadoop’s tolerance to these events using a simple low, medium and high scale. It is important to acknowledge these are my ratings based on my judgement and my experience. By all means if you have a different view leave a comment.

Event Rating Rationale
Data corruption High Support for N-way replication, data scanning and verify on read provides significant scope to respond to data corruption events.
Site failure Medium A Hadoop cluster by design is isolated to one site. Hadoop distributed copy can be used to create offsite copies however there are challenges relating to application consistency and security that need careful consideration in order to maintain recovery positions.
System failure Medium The Hadoop architecture is fault tolerant and designed to tolerate server and network failures. However, system reliability provided by N-way replication is relative to data node count. As node count increases reliability decreased. To maintain constant probability of data loss the replication factor must be adjusted with data node increases.
Operational failure Medium Hadoop lacks role based access controls to Hadoop functions. This makes it difficult to enforce system-wide data protection policies and standards. For example, a data owner can change the replication factor on files they have write access too. This can lead to compromises in data protection policies which cannot be prevented but can be audited by enabling HDFS audit logging.
Application software failure Low Data protection strategies that share the same software and controls with the application being protected are more vulnerable to software (and operational) failures that can propagate to the protection strategy. To address this situation it is common practice to apply defence in depth principles. This introduces a layered approach to data protection that mitigates the risk of a failure (e.g. software) compromising both the environment under protection and the protection strategies.
Infrastructure software failure High One can argue the Hadoop architecture has less reliance on infrastructure software compared to Enterprise architectures that rely on centralised storage systems. The nodes can be made up of a diverse set of building blocks that mitigates against a common wide-spread infrastructure software failures.
Accidental user event Medium HDFS trash allows a user to recover from user-driven actions such as accidental file deletion. However, it can be bypassed and is only implemented in the HDFS client. Furthermore, if the event goes unnoticed and trash expires, data loss occurs. There are documented cases of this happening.
Malicious user event Low If an authorised malicious user (i.e. the insider threat) wanted to delete their data sets they could do so using standard Hadoop privileges. HDFS trash can be bypassed or expunged and HDFS snapshots of a users directories can be removed by the user even if the snapshots were created by the super user. Furthermore, in a secure relationship between two HDFS clusters the distcp end points are implicitly trusted and vulnerable by association.
Malware event Low A Hadoop client with super user privileges has access to the entire Hadoop cluster including all inbuilt protection methods.

 

Hadoop is very reliable when it comes to protecting against data corruption and physical component failures. Where Hadoop’s inbuilt protection strategies fall short is application software and human failures (intentional or otherwise).

To address these events we need to consider taking a copy of the data and placing it under the control of a different (and diverse) system. Separation of duties and system diversification is an important property of a data protection strategy that minimises events from propagating to secondary data sources (i.e. protection copies).

The idea is analogous to diversifying investments across fund managers and asset classes. When funds are split between different fund managers and asset classes the likely hood of one fund manager or asset class affecting the other is reduced. In other words, we hedge our risk.

The same principle applies to data protection. When there is separation of duties and diversification between primary and protection storage the likely hood of cross-contamination from either human events (accidental or otherwise) or software failures is minimised.

Taking copies of Hadoop data sets may sound daunting to begin with however we must remember to apply the hot, warm and cold data concepts to reduce the frequency (how often) and volume (how much) of data that needs to be processed to support our data protection requirements.

To achieve that we need to understand our data. We need to understand the value of the data and what the business impact would be if the data was not available or worse lost. Secondly, based on the business impact analysis (BIA) we need to determine how often and how much data needs to be protected, and/or whether there are other ways to reconstitute the data from alternative sources. Once we understand these classifications we can devise policies aligned with methods to protect the data.

If the data has low business value then it may be appropriate to utilise the method with the lowest cost. For example, HDFS trash with two replicas. If the data has high availability and business value then it would be appropriate to utilise multiple methods that together provide the fastest time to recovery and broadest event coverage (i.e. lowest risk).

Methods providing fast time to recovery and broad event coverage rely on opposing technical strategies.

Fast time to recovery methods rely on technology that is tightly coupled with the primary data source and structure e.g. a snapshot that is not diversified from the primary data source or structure. Methods that yield the broadest event coverage and lowest risk profile rely on technologies that are loosely coupled (or decoupled) from the primary data source and structure e.g. a full independent copy on a diverse storage device and file system that is independently verified.

To achieve both fast time to recovery and broadest event coverage requires the adoption of multiple complimentary protection methods with opposing technical and risk profiles.

Summary

It is certainly clear in my mind that there is no universal approach to providing data protection for large scale Hadoop environments. The options available to you will ultimately depend on:

  • The Hadoop architecture you choose to adopt
  • The Hadoop distribution and the options it offers relative to your requirements
  • The value of data stored in Hadoop and your tolerance for risk
  • The applications layered on top of Hadoop
  • The application workflows and your ability to integrate with them for the purpose of protection and versioning

When I first thought about writing this article I never expected it would reach over 11,000 words. Looking back I think if you have reached this point you would agree data protection for Hadoop is a broad topic that warrants special attention.

I hope you found this useful and I look forward to the continued adoption of Hadoop in the Enterprise. This will drive data protection vendors to embrace the technology and mature our approach to protecting Hadoop environments.

Leave a Comment