Twitter Streaming API で流れてくるユーザー情報を保存するのに、初めは MySQL を使っていました。でも、IO が重かった(自宅ファイルサーバ兼用なので……)ので、Cassandra に切り替えてみました。ちょうど、Key-value store っぽい使いかたしかしてなかったし。
もう少しちゃんと実装してテストも書いてから出そうかと思ってたけど、数ヶ月進展しないからとりあえず貼ってみます。
package MyCassandra;
use Any::Moose;
use namespace::autoclean;
use Net::Cassandra;
use Data::MessagePack;
use Data::Dumper;
use Encode;
has 'keyspace' => (is => 'ro', isa => 'Str');
has 'columnFamily' => (is => 'ro', isa => 'Str');
has 'cassandraArgs' => (is => 'ro', isa => 'HashRef');
has 'cassandra' =>
(is => 'bare', reader => '_get_cassandra', writer => '_set_cassandra');
sub BUILD {
my ($self, @init_args) = @_;
my $cassandra = Net::Cassandra->new($self->cassandraArgs);
$self->_set_cassandra($cassandra->client);
}
sub timestamp {
use Time::HiRes qw/time/;
return sprintf '%d', time * 1000;
}
sub set {
my ($self, $rowKey, $colKey, $value) = @_;
eval {
$self->_get_cassandra->insert(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnPath->new(
{ column_family => $self->columnFamily, column => $colKey, }
),
Data::MessagePack->pack($value),
timestamp(),
Net::Cassandra::Backend::ConsistencyLevel::ZERO
);
};
confess Dumper($@) if $@;
}
sub set_slice {
my ($self, $rowKey, $data) = @_;
my $timestamp = timestamp();
my @cols = do {
my @list;
while (my ($k, $v) = each %$data) {
push @list,
Net::Cassandra::Backend::ColumnOrSuperColumn->new(
{ column => Net::Cassandra::Backend::Column->new(
{ name => $k,
value => Data::MessagePack->pack($v),
timestamp => $timestamp,
}
)
}
);
}
@list;
};
eval {
$self->_get_cassandra->batch_insert(
$self->keyspace, $rowKey,
{ $self->columnFamily => ¥@cols },
Net::Cassandra::Backend::ConsistencyLevel::ZERO
);
};
confess Dumper($@) if $@;
}
sub get {
my ($self, $rowKey, $colKey) = @_;
my $what;
eval {
$what = $self->_get_cassandra->get(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnPath->new(
{ column_family => $self->columnFamily, column => $colKey, }
),
Net::Cassandra::Backend::ConsistencyLevel::QUORUM
);
};
if ($@) {
if ($@ =~ /^Net::Cassandra::Backend::NotFoundException/) {
return undef;
}
else {
confess Dumper($@) if $@;
}
}
return {
value => my_decode(Data::MessagePack->unpack($what->column->value)),
timestamp => $what->column->timestamp
};
}
sub my_decode {
my $data = shift;
if (ref $data eq 'HASH') {
foreach my $k (keys %{$data}) {
$data->{$k} = Encode::decode_utf8 $data->{$k};
}
}
elsif (ref $data eq 'ARRAY') {
@{$data} = map { Encode::decode_utf8 $_ } @{$data};
}
else {
$data = Encode::decode_utf8 $data;
# warn utf8::is_utf8($data) ? 'UTF-8 flag' : 'not UTF-8 flag';
}
return $data;
}
sub my_encode {
my $data = shift;
if (ref $data eq 'HASH') {
foreach my $k (keys %{$data}) {
$data->{$k} = Encode::encode_utf8 $data->{$k};
}
}
elsif (ref $data eq 'ARRAY') {
@{$data} = map { Encode::encode_utf8 $_ } @{$data};
}
else {
$data = Encode::encode_utf8 $data;
# warn utf8::is_utf8($data) ? 'UTF-8 flag' : 'not UTF-8 flag';
}
return $data;
}
sub get_slice {
my ($self, $rowKey) = @_;
my $what;
eval {
$what = $self->_get_cassandra->get_slice(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnParent->new(
{ column_family => $self->columnFamily }
),
Net::Cassandra::Backend::SlicePredicate->new(
{ slice_range => Net::Cassandra::Backend::SliceRange->new(
# XXX: default limit 100
{ start => '', finish => '' }
)
}
),
Net::Cassandra::Backend::ConsistencyLevel::QUORUM
);
};
confess Dumper($@) if $@;
return do {
my %hash;
foreach my $data (@$what) {
# XXX: no timestamp
$hash{ $data->column->name }
= my_decode(Data::MessagePack->unpack($data->column->value));
}
¥%hash;
};
}
sub del {
my ($self, $rowKey, $colKey) = @_;
eval {
$self->_get_cassandra->remove(
$self->keyspace,
$rowKey,
Net::Cassandra::Backend::ColumnPath->new(
{ column_family => $self->columnFamily, column => $colKey }
),
timestamp()
);
};
confess Dumper($@) if $@;
}
__PACKAGE__->meta->make_immutable;