分享一个mapreduce程序,作用:将hdfs文件数据批量加载进redis内存数据库:
1.源代码:
/**
* Program:
* The program is used to batch load data to redis by Jedis.
* History:
* Created by Qingshou Chen on 15/11/13.
*/
package com.asiainfo.bdcenter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.io.IOException;
import java.util.*;
public class BatchToRedis extends Configured implements Tool{
/**
* 继承Configured,implements Tool 可以方便读取命令行-conf -D 配置信息和加载其他配置文件(通过addResource方法)
*/
/**
* 日志处理
*/
private static Logger logger = LoggerFactory.getLogger(BatchToRedis.class);
public int run(String[] args) throws Exception{
/**
* 检查调用参数是否正确
*/
if (args.length != 0){
System.err.println("Usage:hadoop jar BatchToRedis.jar");
System.err.println("Attention:Please set parameters at config.xml in the same path of BatchToRedis.jar");
System.exit(-1);
}
/**
* 初始化配置 config.xml在src目录下,执行jar包时需要将该文件跟jar包放在同一个目录下
*/
Configuration conf = getConf();
conf.addResource(new Path("./config.xml"));
/*Date date = new Date();
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sdf = df.format(date);*/
String jobname = "BatchToRedis:"+conf.get("redis.table.name");
logger.info("***************JOB:"+jobname+" START***************");
logger.info("****This Program is used by batch load data to redis cluster!****");
logger.info("****The result is set hashkeys like:hmset table_name:key fieldname1 fieldvalue1 [fieldnameN fieldvalueN]****");
logger.info("***************"+"INPUT PATH:"+conf.get("input.path")+"***************");
logger.info("***************"+"DATA FIELD NAME:"+conf.get("data.field.name")+"***************");
logger.info("***************"+"DATA FIELD SPLIT:"+conf.get("data.field.split")+"***************");
logger.info("***************"+"REDIS CLUSTER:"+conf.get("redis.cluster.node.list")+"***************");
/**
* SET JOB
*/
Job job = Job.getInstance(conf,jobname);
job.setJarByClass(BatchToRedis.class);
FileInputFormat.addInputPath(job, new Path(conf.get("input.path")));
FileInputFormat.setMinInputSplitSize(job,1);
FileInputFormat.setMaxInputSplitSize(job,Long.parseLong(conf.get("map.split.size"))*1024*1024);
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(BatchToRedisMapper.class);
job.setReducerClass(BatchToRedisReducer.class);
job.setNumReduceTasks(0);
// job.setOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(NullOutputFormat.class);
/**
* 显示job执行时间和结果
*/
long currentTime = System.currentTimeMillis();
boolean success = job.waitForCompletion(true);
logger.info("***************Job Escape: " + StringUtils.formatTimeDiff(System.currentTimeMillis(), currentTime)+"***************");
if (!success)
{
logger.info("***************JOB FAILED***************");
return 1;
}
logger.info("***************JOB END SUCCESSFULL***************");
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new BatchToRedis(), args);
System.exit(exitCode);
}
/**
* map
*/
public static class BatchToRedisMapper extends Mapper<LongWritable, Text, Text, Text> {
private JedisCluster jc;
protected void setup(Context context) throws IOException{
/**
* 连接redis集群
*/
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
Configuration conf = context.getConfiguration();
String[] redisnodes = conf.get("redis.cluster.node.list").split(",");
for(String redisnode : redisnodes){
jedisClusterNodes.add(new HostAndPort(redisnode.split(":")[0],Integer.parseInt(redisnode.split(":")[1])));
}
this.jc = new JedisCluster(jedisClusterNodes);
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
Configuration conf = context.getConfiguration();
String[] fields = line.split(conf.get("data.field.split"));
String[] fieldnames = conf.get("data.field.name").split(",");
Map<String, String> map = new HashMap<String, String>();
/**
* 设置主键 table_name+主键属性(需放在数据文件第一列)
*/
String hashkey = conf.get("redis.table.name") + ":" + fields[0];
/**
* 设置其他属性
*/
for(int i = 1;i<fields.length;i++) {
map.put(fieldnames[i], fields[i]);
}
jc.del(hashkey);
jc.hmset(hashkey, map);
// System.out.println(jc.hget(hashkey,"name"));
}
protected void cleanup(Context context){
/**
* 关闭redis集群
*/
jc.close();
}
}
/**
* reduce
*/
public static class BatchToRedisReducer extends Reducer<Text, Text, Text, IntWritable> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce execute!!!!");
}
}
}
2.配置文件,config.xml:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<!-- 配置map切片大小 单位 MB-->
<property>
<name>map.split.size</name>
<value>32</value>
</property>
<!-- txt文件(数据文件)的路径(HDFS)-->
<property>
<name>input.path</name>
<value>/bdcenter/tmp/test/in</value>
</property>
<!--数据文件字段名,用逗号分隔,注意:主键放第一列! -->
<property>
<name>data.field.name</name>
<value>bill_id,name,age,sex</value>
</property>
<!--数据文件字段分隔符 -->
<property>
<name>data.field.split</name>
<value>,</value>
</property>
<!-- redis数据库中hash 主键中所放的表名-->
<property>
<name>redis.table.name</name>
<value>dw_user</value>
</property>
<!-- Redis cluster node list -->
<property>
<name>redis.cluster.node.list</name>
<value>10.192.168.74:6379,10.192.168.75:6379,10.192.168.76:6379,10.192.168.77:6379,10.192.168.78:6379,10.192.168.79:6379,10.192.168.80:6379,10.192.168.81:6379,10.192.168.82:6379,10.192.168.83:6379</value>
</property>
</configuration>
纪录实际操作过程
内容在个人公众号mangrendd同步更新
相关推荐
此文档在hadoop集群搭建完毕之后,在集群之外搭建一个hadoop集群的开发环境,用于编写hadoop实际处理程序,还包括了如何提交任务等。整体非常详细,如需要其他hadoop集群搭建资源可以联系我,免费给q:1487954071
Hadoop 开发环境搭建第一篇: 1、是用Virtual Box 系统使用的是Redhat ,linux不熟的朋友们可以顺便学习一下linux, 2、环境配置包括防火墙关闭,IP分配,更改域名等 3、服务软件:远程服务SSH等安装
涉及到了Hadoop2.0、Hbase、Sqoop、Flume、Hive、Zookeeper的具体环境搭建
Hadoop开发环境搭建所需文件(hadoop-eclipse-plugin-2.2.0.jar、winutils.exe)
hadoop开发环境搭建教程:利用Cloudera实现Hadoop
hadoop开发环境搭建详细文档,包括CentOS安装及配置,jdk安装及配置,hadoop安装及配置,文档内附环境所需软件分享网址。 CentOS6.5 + jdk1.6 + hadoop
windows下搭建hadoop2.6开发环境详细说明,包括各种错误解决方法及参考文档;包括所需插件及测试代码
windows下搭建hadoop开发环境(Eclipse)
hadoop集群环境的搭建
windows中Hadoop开发环境中用到的bin,因为镜像下载中缺少winutils.exe和hadoop.dll ,所以上传此资源以备使用 windows搭建Hadoop开发环境请参考https://blog.csdn.net/qq_38774450/article/details/108942115
本文档是关于在mac环境下hadoop虚拟集群的搭建,文中详细写了从虚拟机安装到hadoop安装完成的步骤。
Hadoop开发环境搭建Win8+Eclipse+Linux.pdf
hadoop环境搭建和eclipse开发
在一台虚拟机上安装多台linux服务机,并搭建Hadoop集群环境
win7+eclipse+hadoop开发环境搭建[收集].pdf
Hadoop环境搭建Hadoop环境搭建 Hadoop环境搭建
hadoop的环境搭建手册 分布式计算开源框架Hadoop介绍
亲身实验的hadoop 环境搭建 关于很多网上没有讲清楚的地方进行讲解 如静态ip怎么改 而不是到哪改, 还算比较详细 3208006642@qq.com 积分不知道怎么取消 发邮件给我我直接发给你
【大数据入门笔记系列】第五节 SpringBoot集成hadoop开发环境(复杂版的WordCount)前言环境清单创建SpringBoot项目创建包创建yml添加集群主机名映射hadoop配置文件环境变量HADOOP_HOME编写代码添加hadoop依赖jar包...
关于hadoop2.x的安装,其中包括了一个完整的集群以及各种hadoop组件的安装和部署,解压包中包括了四篇的安装教程,超级完整。