This post is on construction.
I’m working on a Recommendations engine. As some processes may be lengthy, I’m also implementing a simple MapReduce framework using a Ruby/DRb schema. I tried Skynet for a while but it looks inmature, and I needed something simple and fast to implement.
The Server – Worker
The DRb server was pretty simple to implement. I just wrapped my Recommendations engine on a single class and created a function for starting it as a DRb server:
def start_server
uri="druby://0.0.0.0:#{ARGV[0]||'5850'}"
trap("INT"){puts("Interrupted"); DRb.thread.exit}
DRb.start_service(uri, RecommendationsServer.new)
puts("Listening #{uri}")
DRb.thread.join
end
Just run the server on every processor to which you can distribute some processing load.
The Client – Controller
Next, I crafted a simple program that implemented the MapReduce model by segmenting an array with the data to process, distribute the segments to the workers and yielding the disered operation (which must be defined on the worker server class).
#map
end
# data : hash to be mapped to @workers
# worker_object and segment are yielded
def map(data)
#data = @apps #version lineal (sin procedimiento)
threads=[]
result={}
inicio = 0
#sec_len = data.keys.length / @workers.length #cargas iguales
@workers.each do |worker_uri,porcentaje_carga|
threads << Thread.new(
#:prefs=>prefs, #carga igual
:segmento=>segmento(data.keys,inicio,porcentaje_carga),
#:selection=>@apps.keys[worker_n*sec_len..(1+worker_n)*sec_len], #carga igual entre procesos
:worker_uri=>worker_uri) do |p|
msg("Spawning worker with #{porcentaje_carga}% load @ #{p[:worker_uri]} ")
worker_object = DRbObject.new(nil,p[:worker_uri])
result["result_#{worker_uri}"] = yield(worker_object,p[:segmento])
#result["result_#{worker_uri}"] = worker_object.calculated_matrix(:prefs=>p[:prefs], :selection=>p[:selection], :type=>:similar, :print=>true)
end
inicio += long_segmento(data.keys.length,porcentaje_carga)
end
threads.each{|t| t.join}
return result
end
def reduce(result_set,output_filename=nil,result_set_name=nil)
r = {}
result_set.each_value{|worker_result| r.merge!(worker_result) if worker_result}
if output_filename
File.open(output_filename,"w") do |f|
f.write("#{result_set_name||'result'}=#{r.inspect}")
end
end
return r
Simplify the process of calling a whole MapReduce cycle:
#args:
end
# :prefs=> conjunto de datos
# :type=> tipo de matriz a calcular
# :output_filename => archivo de salida
def map_reduce(args)
msg(args[:process_name]) if args[:process_name]
#map
start = Time.now
msg("Mapping...")
result_set = map(args[:data]){|worker_object,segmento|
#worker_object.calculated_matrix(:prefs=>args[:data],:selection=>segmento,:type=>args[:type],:print=>:true)
yield(worker_object,segmento)
}
msg_ok(start)
#reduce
msg("Reducing...")
start_red = Time.now
r=reduce(result_set,args[:output_filename],args[:result_set_name])
msg_ok(start_red)
msg_ok(start,"#{args[:process_name]} finished") if args[:process_name]
return r
Finally, calling the MapReduce complete process is as simple as:
ub_recs = map_reduce(
end
:data=>@apps,
:process_name=>'Calculating user based recommendations',
:output_filename=>'__ub_recs.rb',
:result_set_name=>'@@ub_recs') do |worker_object,segmento|
worker_object.calculated_matrix(:prefs=>@apps,:selection=>segmento,:type=>:recommendations,:print=>true)
The yield keyword passes the segmented data and operation parameters to the worker process.
Later, I’ll post on how to load-balance between workers, which proved to be both critical and trickier than first apperared.