- 浏览: 2147950 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
散仙前几篇博客上,已经写了单机程序使用使用hadoop的构建lucene索引,本篇呢,我们里看下如何使用MapReduce来构建索引,代码如下:
控制台生成的信息如下:
本次,散仙测试的使用的数据源有3个文件,当然散仙在这里是小文件,在实际生产中,尽量避免有小文件存放在HDFS上,应该提前合并小文件为大文文件,散仙用了3个测试文件,所以会起了3个map进程,最后生成的索引,有3份,如果需要,我们还可以用生成的多份索引使用一个reduce作业,来完成合并。
package com.mapreduceindex; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.commons.io.output.NullWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.util.Version; import org.apache.solr.store.hdfs.HdfsDirectory; import org.mortbay.log.Log; import org.wltea.analyzer.lucene.IKAnalyzer; import com.qin.wordcount.MyWordCount; /** * * 使用MapReduce构建索引 * @author qindongliang * 大数据技术交流群: 376932160 * 搜索技术一号群: 324714439 * 搜索技术一号群: 206247899 * Hadoop版本2.2.0 * Lucene版本4.8.0 * Solr版本4.8.0 * * **/ public class BuildIndexMapReduce { /** * 获取一个IndexWriter * @param outDir 索引的输出目录 * @return IndexWriter 获取一个IndexWriter * */ public static IndexWriter getIndexWriter(String outDir) throws Exception{ Analyzer analyzer=new IKAnalyzer(true);//IK分词 IndexWriterConfig config=new IndexWriterConfig(Version.LUCENE_48, analyzer); Configuration conf=new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.46.32:9000/");//HDFS目录 Path path=new Path("hdfs://192.168.46.32:9000/qin/"+outDir);//索引目录 HdfsDirectory directory=new HdfsDirectory(path, conf); long heapSize = Runtime.getRuntime().totalMemory()/ 1024L / 1024L;//总内存 long heapMaxSize = Runtime.getRuntime().maxMemory()/ 1024L / 1024L;//使用的最大内存 config.setRAMBufferSizeMB(((heapMaxSize-heapSize)*0.7));//空闲内存的70%作为合并因子 IndexWriter writer=new IndexWriter(directory, config);// return writer; } /** * 索引的工具类 * * **/ public static class LuceneDocumentUtil{ public static Document getDoc(String filed,String value){ Document d=new Document(); //模拟载入schemal文件,根据solr的scheml文件来灵活的坐一些索引, d.add(new TextField("content", value, Store.YES)); return d; } } /** * @author qindongliang * */ private static class BuildIndexMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable> { IndexWriter iw; List<Document> documenst=new ArrayList<>(); @Override protected void setup(Context context)throws IOException, InterruptedException { Random rd=new Random(); int i=rd.nextInt(99999999);//此处的索引目录名可以使用UUID来使它唯一 try{ iw=getIndexWriter(i+"");//初始化IndexWriter }catch(Exception e){ e.printStackTrace(); } } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { Log.info(" 记录的日志信息: "+value.toString()); String values[]=value.toString().split("\1");//此处读入被索引的文件每一行 String fieldName=values[0]; String fieldValue=values[1]; Document d=LuceneDocumentUtil.getDoc(fieldName, fieldValue); if(d==null){ return; } documenst.add(d); if(documenst.size()>5000){//使用批处理提交 iw.addDocuments(documenst); documenst.clear(); } // context.write(null, null); } /*** * 在Map结束时,做一些事,提交索引 * * */ @Override protected void cleanup(Context context)throws IOException, InterruptedException { if(documenst.size()>0){ iw.addDocuments(documenst); } if(iw!=null){ iw.close(true);//关闭至合并完成 } } } public static void main(String[] args)throws Exception { Configuration conf=new Configuration(); conf.set("mapreduce.job.jar", "myjob.jar"); conf.set("fs.defaultFS","hdfs://192.168.46.32:9000"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.address", "192.168.46.32:8032"); /**Job任务**/ //Job job=new Job(conf, "testwordcount");//废弃此API Job job=Job.getInstance(conf, "build index "); job.setJarByClass(BuildIndexMapReduce.class); System.out.println("模式: "+conf.get("yarn.resourcemanager.address"));; // job.setCombinerClass(PCombine.class); job.setNumReduceTasks(0);//设置为3 job.setMapperClass(BuildIndexMapper.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(NullWritable.class); String path="hdfs://192.168.46.32:9000/qin/output"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.46.32:9000/qin/indexinput"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
控制台生成的信息如下:
模式: 192.168.46.32:8032 INFO - RMProxy.createRMProxy(56) | Connecting to ResourceManager at /192.168.46.32:8032 WARN - JobSubmitter.copyAndConfigureFiles(149) | Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. INFO - FileInputFormat.listStatus(287) | Total input paths to process : 3 INFO - JobSubmitter.submitJobInternal(394) | number of splits:3 INFO - Configuration.warnOnceIfDeprecated(840) | user.name is deprecated. Instead, use mapreduce.job.user.name INFO - Configuration.warnOnceIfDeprecated(840) | mapred.jar is deprecated. Instead, use mapreduce.job.jar INFO - Configuration.warnOnceIfDeprecated(840) | fs.default.name is deprecated. Instead, use fs.defaultFS INFO - Configuration.warnOnceIfDeprecated(840) | mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces INFO - Configuration.warnOnceIfDeprecated(840) | mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.job.name is deprecated. Instead, use mapreduce.job.name INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir INFO - Configuration.warnOnceIfDeprecated(840) | mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.outputformat.class is deprecated. Instead, use mapreduce.job.outputformat.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps INFO - Configuration.warnOnceIfDeprecated(840) | mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir INFO - JobSubmitter.printTokens(477) | Submitting tokens for job: job_1407866786826_0001 INFO - YarnClientImpl.submitApplication(174) | Submitted application application_1407866786826_0001 to ResourceManager at /192.168.46.32:8032 INFO - Job.submit(1272) | The url to track the job: http://h1:8088/proxy/application_1407866786826_0001/ INFO - Job.monitorAndPrintJob(1317) | Running job: job_1407866786826_0001 INFO - Job.monitorAndPrintJob(1338) | Job job_1407866786826_0001 running in uber mode : false INFO - Job.monitorAndPrintJob(1345) | map 0% reduce 0% INFO - Job.monitorAndPrintJob(1345) | map 33% reduce 0% INFO - Job.monitorAndPrintJob(1345) | map 100% reduce 0% INFO - Job.monitorAndPrintJob(1356) | Job job_1407866786826_0001 completed successfully INFO - Job.monitorAndPrintJob(1363) | Counters: 27 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=238179 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=67091 HDFS: Number of bytes written=9708 HDFS: Number of read operations=147 HDFS: Number of large read operations=0 HDFS: Number of write operations=75 Job Counters Launched map tasks=3 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=81736 Total time spent by all reduces in occupied slots (ms)=0 Map-Reduce Framework Map input records=166 Map output records=0 Input split bytes=326 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=11308 CPU time spent (ms)=9200 Physical memory (bytes) snapshot=469209088 Virtual memory (bytes) snapshot=2544439296 Total committed heap usage (bytes)=245399552 File Input Format Counters Bytes Read=62970 File Output Format Counters Bytes Written=0
本次,散仙测试的使用的数据源有3个文件,当然散仙在这里是小文件,在实际生产中,尽量避免有小文件存放在HDFS上,应该提前合并小文件为大文文件,散仙用了3个测试文件,所以会起了3个map进程,最后生成的索引,有3份,如果需要,我们还可以用生成的多份索引使用一个reduce作业,来完成合并。
评论
3 楼
RRobinson
2014-12-04
散仙,并行索引构建可以了,那查询呢
2 楼
zhuhongming123
2014-11-26
散仙,请问myjob.jar文件是如何产生的……按照你的思路,报了个File myjob.jar not exist异常
1 楼
RRobinson
2014-08-12
终于等到此篇文章了。。。。
发表评论
-
Lucene/Solr/ElasticSearch搜索问题案例分析
2016-06-23 18:08 1802最近收集的两个搜索 ... -
Lucene+Solr+ElasticSearch查询匹配优化
2016-06-01 19:37 2874当我们在处理搜索业务时候,需求往往是灵活多变的,有时候我们需 ... -
如何实现Solr自定义评分查询
2016-05-12 17:49 4785(一)背景介绍 大多数时候我们使用lucene/solr ... -
浅谈Lucene中的DocValues
2016-05-10 19:12 7406前言: 在Lucene4.x之后, ... -
Lucene暴走之巧用内存倒排索引高效识别垃圾数据
2016-02-01 17:07 1974识别垃圾数据,在一些 ... -
玩转大数据系列之Apache Pig如何与Apache Lucene集成(一)
2015-03-05 21:54 2852在文章开始之前,我们 ... -
Lucene4.3进阶开发之纯阳无极(十九)
2014-12-09 16:37 2691原创不易,转载请务必注明,原创地址,谢谢配合! http:/ ... -
lucene和solr的分词器词库如何从数据库加载?
2014-10-15 15:21 5025原创不易,转载请务必 ... -
如何将Lucene索引写入Hadoop2.x?
2014-07-09 20:22 3196转载请务必注明,原创地址,谢谢配合! http://qind ... -
如何将Lucene索引写入Hadoop?
2014-07-03 19:16 3808转载请务必注明,原创地址,谢谢配合! http://qind ...
相关推荐
利用k_means聚类算法的MapReduce并行化实现,为学习hadoop的同学提供参考
大数据挖掘中的MapReduce并行聚类优化算法研究 (1).pdf
MapReduce并行模型
基于MapReduce并行化视频分析的研究与实现,易晓晔,詹志强,论文提出一种利用MapReduce编程模型加速视频处理过程的方法。随着大数据时代的到来,面对视频数据急速增长,此方法可以高效地处理海
MapReduce并行编程模型研究论文 详细介绍MapReduce
摘要:为了提高k-nearestneighboralgorithm(KNN)算法处理大数据集的能力,本文利用MapReduce并行编程模型,同时结合KNN算法自
基于MapReduce并行的Apriori算法改进研究,这篇论文蛮有用的
MapReduce操作实例-倒排索引.pdf 学习资料 复习资料 教学资源
MapReduce并行计算技术发展综述.doc
实验结果表明,Crypt-Lucene与SSE-1相比,索引构建时间减少了约为60%,同时具有较好的空间性能,对于大文档集合,利用MapReduce在4结点构成的Hadoop集群上并行构建8个Crypt-Lucene索引能减少83.4%的时间。
基于MapReduce的简单倒排索引的建立
稀疏化相似矩阵并行分析及mapreduce并行实现【最新】.doc
在Hadoop上使用MapReduce构建反向索引器 脚步 在Makefile中更改netid(默认为jguo7) $cd src 字数 $cd wordCount $make init: build up the directories in the HDFS, pre-process the input file and put it into...
云计算环境中MapReduce并行计算优化探究.pdf
云计算中的MapReduce并行编程模式研究_吴贵鑫.caj
是关于mapreduce在云计算中的应用的论文,主要集中在mapreduce的计算原理及并行的一些介绍
大数据挖掘中的MapReduce并行聚类优化算法研究.pdf
#资源达人分享计划#
云计算-基于改进的MapReduce并行计算框架的网上拍卖系统.pdf
Apache Pig+MapReduce给Lucene/Solr/ElasticSearch构建索引 ####项目简介 主要是利用了Pig框架简化了自己写Hadoop MapReduce程序来构建大规模并行索引的问题,里面封装了主流的全文检索框架,如Lucene,Solr和...