Yet another MapReduce implementation in Ruby

This post is on construction.

I’m working on a Recommendations engine. As some processes may be lengthy, I’m also implementing a simple MapReduce framework using a Ruby/DRb schema. I tried Skynet for a while but it looks inmature, and I needed something simple and fast to implement.

The Server – Worker

The DRb server was pretty simple to implement. I just wrapped my Recommendations engine on a single class and created a function for starting it as a DRb server:

def start_server
  uri="druby://0.0.0.0:#{ARGV[0]||'5850'}"
  trap("INT"){puts("Interrupted"); DRb.thread.exit}
  DRb.start_service(uri, RecommendationsServer.new)
  puts("Listening #{uri}")
  DRb.thread.join
end

Just run the server on every processor to which you can distribute some processing load.

The Client – Controller

Next, I crafted a simple program that implemented the MapReduce model by segmenting an array with the data to process, distribute the segments to the workers and yielding the disered operation (which must be defined on the worker server class).

#map
# data  : hash to be mapped to @workers
# worker_object and segment are yielded
def map(data)
  #data = @apps #version lineal (sin procedimiento)
  threads=[]
  result={}
  inicio = 0
  #sec_len = data.keys.length / @workers.length #cargas iguales
  @workers.each do |worker_uri,porcentaje_carga|
    threads << Thread.new(
          #:prefs=>prefs, #carga igual
          :segmento=>segmento(data.keys,inicio,porcentaje_carga),
          #:selection=>@apps.keys[worker_n*sec_len..(1+worker_n)*sec_len], #carga igual entre procesos
          :worker_uri=>worker_uri) do |p|
      msg("Spawning worker with #{porcentaje_carga}% load @ #{p[:worker_uri]} ")
      worker_object = DRbObject.new(nil,p[:worker_uri])
      result["result_#{worker_uri}"] = yield(worker_object,p[:segmento])
      #result["result_#{worker_uri}"] = worker_object.calculated_matrix(:prefs=>p[:prefs], :selection=>p[:selection], :type=>:similar, :print=>true)
    end
    inicio += long_segmento(data.keys.length,porcentaje_carga)
  end
  threads.each{|t| t.join}
  return result
end

def reduce(result_set,output_filename=nil,result_set_name=nil)
  r = {}
  result_set.each_value{|worker_result| r.merge!(worker_result) if worker_result}

  if output_filename
    File.open(output_filename,"w") do |f|
      f.write("#{result_set_name||'result'}=#{r.inspect}")
    end
  end

  return r

end

Simplify the process of calling a whole MapReduce cycle:

#args:
#  :prefs=> conjunto de datos
#  :type=> tipo de matriz a calcular
#  :output_filename => archivo de salida
def map_reduce(args)
  msg(args[:process_name]) if args[:process_name]

  #map
  start = Time.now
  msg("Mapping...")
  result_set = map(args[:data]){|worker_object,segmento|
    #worker_object.calculated_matrix(:prefs=>args[:data],:selection=>segmento,:type=>args[:type],:print=>:true)
    yield(worker_object,segmento)
  }
  msg_ok(start)

  #reduce
  msg("Reducing...")
  start_red = Time.now
  r=reduce(result_set,args[:output_filename],args[:result_set_name])
  msg_ok(start_red)

  msg_ok(start,"#{args[:process_name]} finished") if args[:process_name]
  return r

end

Finally, calling the MapReduce complete process is as simple as:

  ub_recs = map_reduce(
    :data=>@apps,
    :process_name=>'Calculating user based recommendations',
    :output_filename=>'__ub_recs.rb',
    :result_set_name=>'@@ub_recs') do |worker_object,segmento|
      worker_object.calculated_matrix(:prefs=>@apps,:selection=>segmento,:type=>:recommendations,:print=>true)

  end

The yield keyword passes the segmented data and operation parameters to the worker process.

Later, I’ll post on how to load-balance between workers, which proved to be both critical and trickier than first apperared.