InjectorJob - Nutch2 24 Mar 2014

InjectorJob主要负责从Seeds文件夹下的文件中读取URL,并根据它以及其元信息创建WebPage实体

在nutch2.1中,完成Inject任务的类有Injector改为InjectorJob,而且由原来的两个MR任务,简化为1个MR任务。

任务配置

public Map<String,Object> run(Map<String,Object> args) throws Exception {
    getConf().setLong("injector.current.time", System.currentTimeMillis());
    Path input;
    Object path = args.get(Nutch.ARG_SEEDDIR);
    if (path instanceof Path) {
      input = (Path)path;
    } else {
      input = new Path(path.toString());
    }
    numJobs = 1;
    currentJobNum = 0;
    currentJob = new NutchJob(getConf(), "inject " + input);
    FileInputFormat.addInputPath(currentJob, input);  // 设置Seed目录
    currentJob.setMapperClass(UrlMapper.class);
    currentJob.setMapOutputKeyClass(String.class);
    currentJob.setMapOutputValueClass(WebPage.class);
    currentJob.setOutputFormatClass(GoraOutputFormat.class); // 设置是用Gora对结果进行输出
    DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),
        String.class, WebPage.class);     // 为WebPage创建DataStore
    GoraOutputFormat.setOutput(currentJob, store, true);
    currentJob.setReducerClass(Reducer.class);  // Any reducer is fine.
    currentJob.setNumReduceTasks(0);  // 关闭Reducer
    currentJob.waitForCompletion(true);
    ToolUtil.recordJobStatus(null, currentJob, results);
    return results;
}

Mapper

读取seeds文件,获取URLs,以及它们的元信息,规范化并过滤URL

输入输出

URL文件,<行号,行文本> → <反转的URL,WebPage>

由于在nutch 2.x中是用GORA抽象了计算结果存储过程,用户可以选择将Nutch的crawl结果存放在Hbase,Cassandra或Mysql,存储结构更像一条数据库的记录,所以在2.x中是用WebPage取代了原来的CrawlDatum.

URL将被反转作为Map的输出Key,例如http://bar.foo.com:8983/to/index.html?a=b,将会被处理为com.foo.bar:http:8983/to/index.html?a=b,这样处理有利于在Hbase中更好的存储,因为在同样域名下scan更快。

	@Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String url = value.toString();
 
      // if tabs : metadata that could be stored
      // must be name=value and separated by \t
      float customScore = -1f;
      int customInterval = interval;
      Map<String, String> metadata = new TreeMap<String, String>();
      if (url.indexOf("\t") != -1) {
        String[] splits = url.split("\t");     // 分别获取URL以及对应的元信息
        url = splits[0];
        for (int s = 1; s < splits.length; s++) {
          // find separation between name and value
          int indexEquals = splits[s].indexOf("=");
          if (indexEquals == -1) {
            // skip anything without a =
            continue;
          }
          String metaname = splits[s].substring(0, indexEquals);
          String metavalue = splits[s].substring(indexEquals + 1);
          if (metaname.equals(nutchScoreMDName)) {
            try {
              customScore = Float.parseFloat(metavalue);
            } catch (NumberFormatException nfe) {
            }
          } else if (metaname.equals(nutchFetchIntervalMDName)) {
            try {
              customInterval = Integer.parseInt(metavalue);
            } catch (NumberFormatException nfe) {
            }
          } else
            metadata.put(metaname, metavalue);
        }
      }
      try {
        url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); // 规范化URL
        url = filters.filter(url); // filter the url
      } catch (Exception e) {
        LOG.warn("Skipping " + url + ":" + e);
        url = null;
      }
      if (url == null)
        return;
 
 
      String reversedUrl = TableUtil.reverseUrl(url);   // 将URL翻转
      WebPage row = new WebPage();
      row.setFetchTime(curTime);
      row.setFetchInterval(customInterval);
 
 
      // now add the metadata
      Iterator<String> keysIter = metadata.keySet().iterator();
      while (keysIter.hasNext()) {
          String keymd = keysIter.next();
          String valuemd = metadata.get(keymd);
          row.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
      }
 
 
      if (customScore != -1)
          row.setScore(customScore);
      else
          row.setScore(scoreInjected);
      try {
          scfilters.injectedScore(url, row);
      } catch (ScoringFilterException e) {
          if (LOG.isWarnEnabled()) {
              LOG.warn("Cannot filter injected score for url " + url
                      + ", using default (" + e.getMessage() + ")");
          }
      }
       
      row.putToMarkers(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0)));
 
      Mark.INJECT_MARK.putMark(row, YES_STRING);   // 标记为Inject过程
      context.write(reversedUrl, row);   // 写到GORA中
    }
  }

Reducer

InjectorJob采用默认的Reducer,也就是说直接将Mapper的输出写到配置好的OutputPath

总结

相较1.x版本,InjectorJob更为简洁,一方面借助了Hbase等数据存储的特性,免去了排重这一步,但当前库中如果存在该URL,并该URL已经被fetched,标记为Fetched状态,再次执行InjectorJob时,该URL是否被覆盖成Injected状态?