Skip to content

Heartbeat logging while consuming Twitter streams using Phirehose

by Jonathon on June 11th, 2011

Phirehose is an awesomely useful Twitter Streaming API client library, written in PHP by Fenn Bailey.

Heartbeat logging is something that I originally added for Rainmaker, and I finally got around to contributing those modifications, which you can see here on GitHub.

Why log heartbeats in Phirehose?

  • To gain assurance that Phirehose is still alive, and actually functioning. In our case, missing tweets means lost money and unhappy clients. We needed to monitor this very closely.
  • To enable automatically detecting connection drops and rewinding the count parameter to pick up those tweets, or backfilling them in using the Twitter Search API.
  • To collect usage data for reporting purposes.

Usage

To use this, simply declare a heartbeat(array $data) method in your Phirehose child class.

Here is an example:

<?php

// USAGE:
//
//   require_once 'phirehose.php';
//   $dbh = mysql_connect($host, $username, $password);
//   PhirehoseConsumer::Initialize($dbh);
//   PhirehoseConsumer::start();

class PhirehoseConsumer extends Phirehose {

	private static $instance;
	private static $dbh;

	protected $consumer_start_time = 0;
	protected $consumer_uptime = 0;

	public static function Initialize($dbh) {
		if (! self::$instance instanceof self) {
			self::$dbh = $dbh;
			self::$instance = new self ( TWITTER_API_USERNAME, TWITTER_API_PASSWORD, Phirehose::METHOD_FILTER );
		}
	}

	protected static function get_last_heartbeat() {
		$sql = 'SELECT time_stamp FROM stream_log ORDER BY time_stamp DESC LIMIT 1';
		$result = mysql_query ( self::$dbh, $sql );
		$last_heartbeat_ts = mysql_result ( $result, 0, 'time_stamp' );
		if (strtotime ( $last_heartbeat_ts ) === FALSE)
			$last_heartbeat_ts = date ( 'Y-m-d H:i:s' ); // default to now
		return $last_heartbeat_ts;
	}

	protected static function get_missed_count() {

		// get timestamp of the most recent intake script heartbeat
		$last_heartbeat_ts = self::get_last_heartbeat ();

		// how long since we were last running?
		$downtime = ( int ) (time () - strtotime ( $last_heartbeat_ts ));

		// Based on the past average, how many tweets did we probably miss?
		//
		// This calculation is based on the statusRate averaged over a period of
		// time not greater than $downtime, and multiplied by a 1.5x fudge factor.
		$sql = <<<SQL
SELECT CEIL(1.5 * AVG(statusRate) * $downtime) AS count
FROM stream_log WHERE
    (UNIX_TIMESTAMP('$last_heartbeat_ts') - UNIX_TIMESTAMP(time_stamp) > 0) AND
    (UNIX_TIMESTAMP('$last_heartbeat_ts') - UNIX_TIMESTAMP(time_stamp) > 0) <= $downtime
SQL;
		$result = mysql_query ( self::$dbh, $sql );
		$count = mysql_result ( $result, 0, 'count' );

		return ($count === FALSE) ? 0 : ( int ) $count;
	}

	public static function start() {
		// get timestamp of the most recent heartbeat
		$last_heartbeat_ts = self::get_last_heartbeat ();

		// Estimate how many tweets were missed, and rewind.
		//
		// Note that if you're using the filter streaming method, this will fail with the following error:
		//   "The Streaming API count parameter is not allowed in role statusDefaultFiltered."
		$count = self::get_missed_count ();
		if ($count) {
			self::$instance->log ( "Using count: $count" );
			self::$instance->setCount ( $count );
		}

		// run backfill script in background - uses Twitter Search API
		self::$instance->log ( "Backfilling from $last_heartbeat_ts" );
		passthru ( "backfill.php '$last_heartbeat_ts' >/dev/null &" );

		self::$instance->checkFilterPredicates ();
		self::$instance->consume ();
	}

	public function enqueueStatus($status) {
		// do something with your tweet
		return TRUE;
	}

	public function heartbeat(array $data) {
		$sql = <<<SQL
INSERT INTO stream_log SET
    time_stamp = CURRENT_TIMESTAMP,
    elapsed = $data[elapsed],
    statusRate = $data[statusRate],
    statusCount = $data[statusCount],
    enqueueSpent = $data[enqueueSpent],
    enqueueSpentAvg = $data[enqueueSpentAvg],
    filterCheckCount = $data[filterCheckCount],
    filterCheckSpent = $data[filterCheckSpent],
    idlePeriod = $data[idlePeriod],
    maxIdlePeriod = $data[maxIdlePeriod]
SQL;
		mysql_query ( $dbh, $sql );
	}

	protected function checkFilterPredicates() {
		// useful for debugging
		$this->consumer_uptime = time () - $this->consumer_start_time;
		$this->log ( sprintf ( 'Uptime: %s, memory usage: %skb', $this->consumer_uptime, number_format ( memory_get_usage () / 1024, 0 ) ) );

		// update filter predicates...
	}

	protected function connect() {
		$this->consumer_start_time = time ();
		parent::connect ();
	}

}

From → Notebook

No comments yet

Leave a Reply

Note: XHTML is allowed. Your email address will never be published.

Subscribe to this comment feed via RSS