Analyzing Twitter Data with Apache Hadoop, Part 2 20 Mar 2014
Gathering Data with Flume
Sources
source是Flume里连接数据源的组件,是数据在Flume数据流流转的起点。Source处理事件,并将它们发送给一个channel。Sources通过收集不连续的数据,并将数据转换为独立的事件,然后使用channel处理事件,可以一次处理一个事件,也可以作为一个batch处理一次处理一批事件。
Flume有两种类型的sources,事件驱动的和可拉取的。事件驱动和可拉取之间的不同在于事件如何生成和处理。事件驱动源通过回调的机制接收事件。与此相反,可拉取的源循环去读取事件。另外一种区分这两种sources的方式是`PUSH V.S. PULL`模型,事件驱动sources是事件推送给sources;可拉取sources是从一个generator中拉取事件。
检查TwitterSource
在前面例子中,我们建立一个叫做TwitterSource的自定义source。为了更深刻的理解source如何运作,让我们看一下如何构建TwitterSource。我们先建立一个TwitterSource的样本文件
/**
* A template for a custom, configurable Flume source
*/
public class BoilerplateCustomFlumeSource extends AbstractSource
implements EventDrivenSource, Configurable {
/**
* The initialization method for the Source. The context contains all the
* Flume configuration info, and can be used to retrieve any configuration
* values necessary to set up the Source.
*/
@Override
public void configure(Context context) {
// Get config params with context.get* methods
// Example: stringParam = context.getString("stringParamName")
}
/**
* Start any dependent systems and begin processing events.
*/
@Override
public void start() {
// For an event-driven source, the start method should spawn
// a thread that will receive events and forward them to the
// channel
super.start();
}
/**
* Stop processing events and shut any dependent systems down.
*/
@Override
public void stop() {
super.stop();
}
}
这样我们就有了一个可配置的source,并且可以将它安装到Flume,尽管现在并不做任何操作。
start()方法包含了大部分source的逻辑。在TwitterSource中,twitter4j库使用下面代码来访问Twitter Streaming API
// The StatusListener is a twitter4j API, which can be added to a Twitter
// stream, and will execute callback methods every time a message comes in
// through the stream.
StatusListener listener = new StatusListener() {
// The onStatus method is a callback executed when a new tweet comes in.
public void onStatus(Status status) {
Map headers = new HashMap();
// The EventBuilder is used to build an event using the headers and
// the raw JSON of a tweet
headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
Event event = EventBuilder.withBody(
DataObjectFactory.getRawJSON(status).getBytes(), headers);
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException e) {
// If we catch a channel exception, it’s likely that the memory channel
// does not have a high enough capacity for our rate of throughput, and
// we tried to put too many events in the channel. Error handling or
// retry logic would go here.
throw e;
}
}
// This listener will ignore everything except for new tweets
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
public void onScrubGeo(long userId, long upToStatusId) {}
public void onException(Exception ex) {}
};
StatusListener实现了一系列回调方法,这些回调方法在收到一个新tweet时被调用,tweet用Status对象表示。StatusListener还有其它回调方法,但是本例中我们只关心新tweets。在TwitterSource中我们可以看到,StatusListener在start()方法中被创建和注册。
再进一步观察,我可以看到为tweet建立一个事件:
headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
Event event = EventBuilder.withBody(
DataObjectFactory.getRawJSON(status).getBytes(), headers));
EventBuilder接口接收一个字节数组和一个可选的headers集合作为参数,创建一个事件,并将该事件放在一个list的末尾。Source处理events并将它们传递给channel。
channel.processEvent(event);
为了连接Twitter API,我们需要访问一些应用级别的敏感信息。在TwitterSource中我们用到了consumerKey和consumerSecret变量。
twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
consumerKey和consumerSecret在哪儿定义呢?这里,这两个变量为配置参数,查看一下configure()方法,我们可以看到这两个变量的定义。
consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
context对象包含所有source的配置参数,它们可以用一系列get方法访问到。
这样,自定义source能够将tweets作为事件来处理。下一步将定义这些事件应该流向到哪里,和它们如何到达那里。
配置Flume Agent
在我们讨论如何实际配置一个Flume的agent之前,我们需要知道一个完整的配置的形式。这个例子中我们使用该配置
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = [required]
TwitterAgent.sources.Twitter.consumerSecret = [required]
TwitterAgent.sources.Twitter.accessToken = [required]
TwitterAgent.sources.Twitter.accessTokenSecret = [required]
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
每一个定义的对象会被其他配置引用。大多数Flume配置项的格式和logj的appenders相似。一个配置项形式如下
[agent_name].[object_type].[object_name].[parameter_name] // [object_type]是sources,channels或者sinks其中之一
Channels
Channels是连接sources和sinks的一条路径,Events由sources添加到channels,然后由sinks从channels中删除。Flume数据流实际上支持多channels,这样可以组成更复杂的数据流,例如为了复制,形成一个像扇形的数据流。
在本例中,我们使用了一个内存channel
TwitterAgent.channels.MemChannel.type = memory
内存channel使用一个内存中的队列来存储事件直到它们准备好被写到一个sink。内存channel在数据流吞吐量较高时比较有用;然而,由于events存储在内存,当agent出现错误时,它们会丢失。如果数据丢失的风险不能被容忍,可以使用其他类型的channel,例如FileChannel。
Sinks
Flume数据流最后一步是sink。Sinks获取events,并将他们发送给配置好的位置,或转发给另外一个agent。在本例中我们使用了HDFS sink,它将事件存储到HDFS的一个指定位置。
我们使用的HDFS sink配置做了许多事情:首先,它用rollCount参数定义了文件大小,这样每个文件将包含10,000条tweets。它也通过设置fileType为DataStream和writeFormat为Text保持了原始数据格式,而不是将数据存储为SequenceFile或其他格式。最有趣的一点,是存储路径参数。
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
文件路径使用一些通配符来使文件存放到一系列表示事件发生的年,月,日,时的文件夹中。例如,一个事件在9/20/2012 3:00PM到达,它将存放在<hdfs://hadoop1:8020/user/flume/tweets/2012/09/20/15>。
时间戳信息来自于哪里?如果你回顾一下,我们在TwitterSource中为每一个事件添加了一个header:
headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
时间戳被用来确定事件的时间戳,并被用来确定事件存放的路径。
运行Agent
现在我们理解了我们source,channel和sink的配置,我们需要运行agent来运行数据流。在我们实际启动agent前,我们需要为agent设置一个合适的名字。
/etc/default/flume-ng-agent文件包含了一个叫做FLUME_AGENT_NAME环境变量。在一个生产系统中,为了简化,FLUME_AGENT_NAME可以设置为运行agent的机器的hostname。在本例中,我们设置它为TwitterAgent。
我们可以用以下命令运行agent
$ /etc/init.d/flume-ng-agent start
当它运行时,我们可以看到/user/flume/tweets目录下会出现一些文件
natty@hadoop1:~/source/cdh-twitter-example$ hadoop fs -ls /user/flume/tweets/2012/09/20/05
Found 2 items
-rw-r--r-- 3 flume hadoop 255070 2012-09-20 05:30 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893253
-rw-r--r-- 3 flume hadoop 538616 2012-09-20 05:39 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893254.tmp
在更多的事件处理后,Flume将文件写到对应的文件夹中。临时文件的后缀名为.tmp,是当前正在写的文件。当Flume确定文件包含了足够的events,或足够的时间来切换文件时,.tmp后缀将被移除。这由HDFS sink中的配置确定,我们在上面已经有所介绍,rollCount和rollInterval参数
原文地址
Analyzing Twitter Data with Apache Hadoop, Part 2: Gathering Data with Flume
下一章介绍使用Hive查询HDFS数据