Indexing Twitter Data with Elasticsearch

Today I would like to share some experiences we had while indexing Twitter data with Elasticsearch. Before I start, some background information: We are collecting the 1% sample stream from Twitter at L3S Research Center for more than two years now using the excellent Twitter Tools from Jimmy Lin. The data is stored in HDFS in a cluster system where also Elasticsearch is running. To efficiently index the data in Elasticsearch it is necessary to use the Elasticsearch for Hadoop library, which tries to ensure that each Elasticsearch node is indexing the data from HDFS that the nodes is storing locally. We are using the Pig library to read and index the data. All scripts that are mentioned (and linked) in this post can be found in my twitter-toolbox project.

Preliminaries #

The stored Twitter stream resides in compressed text files where each line contains a tweet in JSON format. One problem we have with these files is that they contain status messages like

Stream closed.
Stream closed.
Waiting for 250 milliseconds
Establishing connection.
Connection established.
Receiving status stream.

which confuse either Elasticsearch or the JsonLoader of Pig. Hence, we need to filter these lines. We have two options for reading, filtering, and writing the data which I describe below.

Index optimization #

Although the Elasticsearch index can be created automatically upon storage, I highly recommend to fine-tune the analysis and indexing parameters. Therefore, I have published an optimized index configuration that I will describe in a separate blog post (some information can be found here). To create an index with that configuration, you can either use the script elasticsearch.sh or the following command:

curl -XPUT $HOST/$INDEX -d @index_config.json

Proposals for improving the configuration are welcome!

Reading JSON and writing tuples #

We can read and parse the JSON data from the Twitter stream with Pig and send the tuples to Elasticsearch. Therefore, we need to use the JsonLoader from Twitter’s ElephantBird library, since it is more robust towards broken JSON:

records = LOAD '$DATA' USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad') AS (json:map[]);

We can then filter the relevant tweets, e.g., by checking for the key created_at, and thereby ignore “delete” statements and status messages (though I have to check if this is really necessary):

tweets = FILTER records BY json#'created_at' IS NOT NULL;

Finally, we can store the tuples in Elasticsearch:

STORE tweets INTO '$INDEX' USING org.elasticsearch.hadoop.pig.EsStorage
         ('es.http.timeout = 5m', 'es.index.auto.create = true', 'es.nodes = $HOST' );

Problems with that approach #

Indexing one day of data using that approach with the script load_elasticsearch.pig was no problem (and took just some minutes). However, the indexed documents do not contain any data:

./elasticsearch.sh  -D KYqKK5rqRZWu49HpRlZhnA twitter_test_rja
querying index twitter_test_rja for doc KYqKK5rqRZWu49HpRlZhnA
{
  "_index" : "twitter_test_rja",
  "_type" : "tweet",
  "_id" : "KYqKK5rqRZWu49HpRlZhnA",
  "exists" : false
}

I could not find the reason for that problem, yet. Suggestions are welcome! This basically means that this approach is not feasible.

Nevertheless, I tested the script for one month of data where it failed after 50% due to node failures:

Could not write all entries [384/16000] (maybe ES was overloaded?).
Bailing out...

and some lines like

node05.ib_8041:2015-03-12 14:54:54,474 ERROR [main] org.elasticsearch.hadoop.rest.NetworkClient: Node [Read timed out] failed (10.10.1.5:9201); selected next node [10.10.1.2:9201]

in the log files.

Given that the load and memory consumption were not too high on the nodes I wonder what the reason could have been?

Finally, the script does currently not work when I have created an Elasticsearch index using the optimized configuration as described above. When I use it, I get errors like

Error: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: Found unrecoverable error [failed to parse doc to extract routing/timestamp/id]; Bailing out..

These problems have been described here and here and they are caused by the missing data, as described before.

Another observation I made during debugging was that when I stored a data sample to disk using PigStorage I found that it contained errors, since newlines (\n) were interpreted rather than escaped. Using JsonStorage, however, they were escaped properly. I have not found out, yet, why this happens.

Reading and writing strings #

At first, I thought that parsing the JSON and writing tuples is the preferred way of dealing with the data since it also enables further manipulation (e.g., filtering) within Pig. However, since this does not work, I tried another option: reading the JSON as a plain string and letting Elasticsearch do the parsing (cf. the option es.input.json). Therefore, we load the data as a chararray:

records = LOAD '$DATA' USING PigStorage() AS (json:chararray);

and then filter the lines that contain the tweet data (to ignore the delete statements and the status messages):

tweets = FILTER records BY SUBSTRING(json, 0, 13) == '{"created_at"';

Finally, we store the tuples in Elasticsearch:

STORE tweets INTO '$INDEX' USING org.elasticsearch.hadoop.pig.EsStorage
         ('es.http.timeout = 5m', 'es.index.auto.create = true', 'es.input.json = true', 'es.nodes = $HOST');

The only difference in the last statement compared to the initial approach is that es.input.json is set to true.

That approach indeed worked, even together with the optimized index configuration! As an example:

./elasticsearch.sh  -D 550426718000144384 twitter_test_rja
querying index twitter_test_rja for doc 550426718000144384
{
  "_index" : "twitter_test_rja",
  "_type" : "tweet",
  "_id" : "550426718000144384",
  "_version" : 1,
  "exists" : true, "_source" : {"created_at":"Wed Dec 31 23:02:10 +0000 2014", ...
}

(remaining tweet data omitted for readability)

Indexing one day of data worked without any problems and took around 90 minutes, since just one machine is involved in that case (we have one file per day). I am currently testing how Elasticsearch can deal with a higher load when indexing several files in parallel.

Conclusion #

In principle, the tools to index Twitter data from HDFS with Elasticsearch in a distributed fashion are all available and once they work, the code is very simple and straightforward:

records = LOAD '$DATA' USING PigStorage() AS (json:chararray);
tweets = FILTER records BY SUBSTRING(json, 0, 13) == '{"created_at"';
STORE tweets INTO '$INDEX' USING org.elasticsearch.hadoop.pig.EsStorage ('es.input.json = true', 'es.nodes = $HOST');

However, there are some obstacles one needs to take care of:

  1. Errors in the JSON feed can confuse some JSON parsers and therefore need to be filtered.
  2. Parsing the JSON with Pig and storing the tuples to Elasticsearch did not work.
  3. An optimized index configuration is important.