elasticsearchのindex更新を非同期に行う(tire + resque)


昨日の記事でtireを用いて全文検索エンジンのelasticsearchを操作する方法をまとめました。

前回はデータ追加、削除時に即indexの更新、削除を行っていましたが、ジョブキューサーバに処理を投げてindex更新を非同期に行えるようにしてみます。

ジョブキューには先日ご紹介したresqueを使います。

初期設定

  • Gemfile
#Gemfile

gem 'resque'
gem 'daemon-spawn', :require => 'daemon_spawn'
  • bundle install
./bin/bundle install

Topicモデル更新

昨日の記事で作成したTopicモデルのindex_updateメソッドとindex_removeメソッドを修正します。

  • before
# app/models/topic.rb

  # save後にindexを更新
  def index_update
    self.index.store self
  end

  # destroy後にindexから削除
  def index_remove
    self.index.remove self
  end
  • after

resqueに処理を投げるようにします

#app/models/topic.rb

  # save後にindexを更新
  def index_update
    Resque.enqueue(IndexUpdater, :update, self.id)
  end

  # destroy後にindexから削除
  def index_remove
    Resque.enqueue(IndexUpdater, :delete, self.id)
  end

Worker作成

  • workersディレクトリ作成
mkdir app/workers
  • index_updater追加
# app/workers/index_updater.rb

class IndexUpdater
  @queue = :defaul

  def self.perform(action_type, id)
    case action_type.to_sym
    when :update
      # :updateの場合はindex更新
      Topic.find(id).tire.update_index
    when :delete
      # :deleteの場合はindexから該当Topicを削除
      Topic.index.remove Topic.find(id)
    end
  end
end
  • resque_worker用のdaemonスクリプト作成
#bin/resque_worker

#!/usr/bin/env ruby
require File.expand_path('../../config/application', __FILE__)
Rails.application.require_environment!

class ResqueWorkerDaemon < DaemonSpawn::Base
  def start(args)
    @worker = Resque::Worker.new('default')
    @worker.verbose = true
    @worker.work
  end

  def stop
  end
end

ResqueWorkerDaemon.spawn!({
  :processes => 1,
  :working_dir => Rails.root,
  :pid_file => File.join(Rails.root, 'tmp', 'pids', 'resque_worker.pid'),
  :log_file => File.join(Rails.root, 'log', 'resque_worker.log'),
  :sync_log => true,
  :singleton => true,
  :signal => 'QUIT'
})
  • 実行権限追加
chmod 755 ./bin/resque_worker.rb
  • daemon起動
./bin/resque_worker.rb start

確認

  • 別ターミナルでtail -f log/resque_worker.log でログを眺めておく

  • rails consoleからTopicを追加

./bin/rails c
irb(main):001:0> Topic.create(title: 'Nodeでhello world', body: 'おはようございます。サンプルです')
irb(main):002:0> Topic.create(title: 'Nodeでhello world', body: 'こんにちわ。サンプルです')
  • ログにindex更新の結果が残る
# log/resque_worker.log

*** Found job on default
*** got: (Job{default} | IndexUpdater | ["update", 8])
*** resque-1.25.1: Processing default since 1386161763 [IndexUpdater]
*** Running before_fork hooks with [(Job{default} | IndexUpdater | ["update", 8])]
*** resque-1.25.1: Forked 42946 at 1386161763
*** Running after_fork hooks with [(Job{default} | IndexUpdater | ["update", 8])]
*** done: (Job{default} | IndexUpdater | ["update", 8])
  • ブラウザから「node」で検索すると絞り込まれた結果が取得できます

http://localhost:3000/?keyword=node

{
  "paging": {
    "current_page": 1,
    "per_page": 3,
    "total_pages": 1,
    "total": 2
  },
  "topics": [
    {
      "updated_at": "2013-12-04T12:56:00.161Z",
      "created_at": "2013-12-04T12:56:00.161Z",
      "body": "こんにちわ。サンプルです",
      "title": "Nodeでhello world",
      "id": 8
    },
    {
      "updated_at": "2013-12-04T12:55:01.113Z",
      "created_at": "2013-12-04T12:55:01.113Z",
      "body": "おはようございます。サンプルです",
      "title": "Nodeでhello world",
      "id": 7
    }
  ]
}

まとめ

resqueを使ってelasticsearchのindex更新を非同期に行うことができました。

サンプルコードはこちらに置いておきます:hilotter / tire_sample

参考