本文共 2918 字,大约阅读时间需要 9 分钟。
import org.apache.hadoop.hbase.HBaseConfiguration; //导入方法依赖的package包/类
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param partitioner Partitioner to use. Pass null
to use
* default partitioner.
* @param quorumAddress Distant cluster to write to; default is null for
* output to the cluster that is designated in hbase-site.xml
.
* Set this String to the zookeeper ensemble of an alternate remote cluster
* when you would have the reduce write a cluster that is other than the
* default; e.g. copying tables between clusters, the source would be
* designated by hbase-site.xml
and this param would have the
* ensemble address of the remote cluster. The format to pass is particular.
* Pass <hbase.zookeeper.quorum>:<
* hbase.zookeeper.client.port>:<zookeeper.znode.parent>
* such as server,server2,server3:2181:/hbase
.
* @param serverClass redefined hbase.regionserver.class
* @param serverImpl redefined hbase.regionserver.impl
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When determining the region count fails.
*/
public static void initTableReducerJob(String table,
Class extends TableReducer> reducer, Job job,
Class partitioner, String quorumAddress, String serverClass,
String serverImpl, boolean addDependencyJars) throws IOException {
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
// Calling this will validate the format
ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if (serverClass != null && serverImpl != null) {
conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
}
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(regions);
}
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
if (addDependencyJars) {
addDependencyJars(job);
}
initCredentials(job);
}
转载地址:http://beexl.baihongyu.com/