Yet another MapReduce implementation in Ruby

This post is on construction.

I’m working on a Recommendations engine. As some processes may be lengthy, I’m also implementing a simple MapReduce framework using a Ruby/DRb schema. I tried Skynet for a while but it looks inmature, and I needed something simple and fast to implement.

The Server – Worker

The DRb server was pretty simple to implement. I just wrapped my Recommendations engine on a single class and created a function for starting it as a DRb server:

def start_server
  uri="druby://0.0.0.0:#{ARGV[0]||'5850'}"
  trap("INT"){puts("Interrupted"); DRb.thread.exit}
  DRb.start_service(uri, RecommendationsServer.new)
  puts("Listening #{uri}")
  DRb.thread.join
end

Just run the server on every processor to which you can distribute some processing load.

The Client – Controller

Next, I crafted a simple program that implemented the MapReduce model by segmenting an array with the data to process, distribute the segments to the workers and yielding the disered operation (which must be defined on the worker server class).

#map
# data  : hash to be mapped to @workers
# worker_object and segment are yielded
def map(data)
  #data = @apps #version lineal (sin procedimiento)
  threads=[]
  result={}
  inicio = 0
  #sec_len = data.keys.length / @workers.length #cargas iguales
  @workers.each do |worker_uri,porcentaje_carga|
    threads << Thread.new(
          #:prefs=>prefs, #carga igual
          :segmento=>segmento(data.keys,inicio,porcentaje_carga),
          #:selection=>@apps.keys[worker_n*sec_len..(1+worker_n)*sec_len], #carga igual entre procesos
          :worker_uri=>worker_uri) do |p|
      msg("Spawning worker with #{porcentaje_carga}% load @ #{p[:worker_uri]} ")
      worker_object = DRbObject.new(nil,p[:worker_uri])
      result["result_#{worker_uri}"] = yield(worker_object,p[:segmento])
      #result["result_#{worker_uri}"] = worker_object.calculated_matrix(:prefs=>p[:prefs], :selection=>p[:selection], :type=>:similar, :print=>true)
    end
    inicio += long_segmento(data.keys.length,porcentaje_carga)
  end
  threads.each{|t| t.join}
  return result
end

def reduce(result_set,output_filename=nil,result_set_name=nil)
  r = {}
  result_set.each_value{|worker_result| r.merge!(worker_result) if worker_result}

  if output_filename
    File.open(output_filename,"w") do |f|
      f.write("#{result_set_name||'result'}=#{r.inspect}")
    end
  end

  return r

end

Simplify the process of calling a whole MapReduce cycle:

#args:
#  :prefs=> conjunto de datos
#  :type=> tipo de matriz a calcular
#  :output_filename => archivo de salida
def map_reduce(args)
  msg(args[:process_name]) if args[:process_name]

  #map
  start = Time.now
  msg("Mapping...")
  result_set = map(args[:data]){|worker_object,segmento|
    #worker_object.calculated_matrix(:prefs=>args[:data],:selection=>segmento,:type=>args[:type],:print=>:true)
    yield(worker_object,segmento)
  }
  msg_ok(start)

  #reduce
  msg("Reducing...")
  start_red = Time.now
  r=reduce(result_set,args[:output_filename],args[:result_set_name])
  msg_ok(start_red)

  msg_ok(start,"#{args[:process_name]} finished") if args[:process_name]
  return r

end

Finally, calling the MapReduce complete process is as simple as:

  ub_recs = map_reduce(
    :data=>@apps,
    :process_name=>'Calculating user based recommendations',
    :output_filename=>'__ub_recs.rb',
    :result_set_name=>'@@ub_recs') do |worker_object,segmento|
      worker_object.calculated_matrix(:prefs=>@apps,:selection=>segmento,:type=>:recommendations,:print=>true)

  end

The yield keyword passes the segmented data and operation parameters to the worker process.

Later, I’ll post on how to load-balance between workers, which proved to be both critical and trickier than first apperared.

 

 

Firefox 3/2 side-by-side on Windows

Want to use FF2/3 side by side? Check this. Although it describes the procedure for Mac/Linux, everything is almost the same for Windows. Just check this recommendations:

  1. Download the *.win32.zip from the night build.
  2. Unpack on a new directory, say \Program Files\Mozilla Firefox 3
  3. Create the new profile as described
  4. Create a shortcut to \Program Files\Mozilla Firefox 3\firefox.exe with options:
     -P newprofile -no-remote

That’s it!

On Ruby & Python Performance

I’m working on some algorithms taken from the amazing book “Collective Intelligence” by Toby Segaran. Some of these algorithms can be pretty computing intensive, so I’m working also in _yet another implementation_ of MapReduce on Ruby/DRb. So I had to re-implement Toby’s Python algorithms on Ruby, which pretty straightforward.

This is a little chart of the processing time of one of the algorithms running on different languages/implementations with the same data subset.

 

Language Version Time
Ruby 1.8.6 16.6 seg
Python 2.5.4 4.15 seg
IronPython 1.1.1 10.45 seg
Ruby MapReduced 10.65 seg