fluke/devops-service/workers/worker.rb

143 lines
3.3 KiB
Ruby
Raw Permalink Normal View History

2014-10-22 15:01:55 +04:00
root = File.join(File.dirname(__FILE__), "..")
$LOAD_PATH.push root unless $LOAD_PATH.include? root
2018-04-04 22:44:39 +03:00
require 'tilt/erubis'
2014-10-22 15:01:55 +04:00
require "sidekiq"
require "sidekiq/api"
2014-12-04 13:07:25 +03:00
require "fileutils"
2015-08-12 11:37:17 +03:00
require "lib/knife/knife_factory"
require "lib/puts_and_flush"
2015-10-26 12:50:31 +03:00
2018-04-04 22:44:39 +03:00
require "db/mongo/models/job_task"
require "db/mongo/models/server"
require "lib/executors/server_executor"
2015-10-26 12:50:31 +03:00
# All options keys MUST be a symbol!!!
2014-10-22 15:01:55 +04:00
class Worker
include Sidekiq::Worker
include PutsAndFlush
2014-10-22 15:01:55 +04:00
2018-04-04 22:44:39 +03:00
# sidekiq_options :queue => :cid, :retry => false
sidekiq_options :retry => false
2015-09-11 12:05:13 +03:00
attr_accessor :out
2014-12-04 13:07:25 +03:00
module STATUS
INIT = "init"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
IN_QUEUE = "queued"
2014-12-04 13:07:25 +03:00
end
2015-11-03 12:05:07 +03:00
def self.start_async(worker_class, job_options)
2015-09-01 16:31:31 +03:00
jid = worker_class.perform_async(job_options.dup)
2018-04-04 22:44:39 +03:00
Devops::Model::JobTask.create({
"_id" => jid,
"status" => Worker::STATUS::IN_QUEUE
})
2015-09-01 16:54:21 +03:00
DevopsLogger.logger.info "Job '#{jid}' has been queued"
2015-11-03 12:05:07 +03:00
jid
2014-10-22 15:01:55 +04:00
end
2018-04-04 22:44:39 +03:00
def self.start_sync(worker_class, job_options, out)
stringified_options = job_options
2015-10-26 12:50:31 +03:00
=begin
job_options.each do |key, value|
stringified_options[key.to_s] = value
end
2015-10-26 12:50:31 +03:00
=end
2015-09-11 12:05:13 +03:00
w = worker_class.new
w.out = out
w.perform(stringified_options)
end
def self.set_status id, status
Sidekiq.redis {|con| con.hset "devops", id, status}
end
2015-11-03 12:05:07 +03:00
def call &block
2018-04-04 22:44:39 +03:00
if jid
call_async(&block)
else
call_sync(&block)
2014-12-04 13:07:25 +03:00
end
2018-04-04 22:44:39 +03:00
rescue StandardError => e
DevopsLogger.logger.error "#{e.message}:\n#{e.backtrace.join("\n")}"
2015-09-01 16:31:31 +03:00
end
private
2015-11-03 12:05:07 +03:00
def initialize_devops()
2015-09-01 16:31:31 +03:00
DevopsLogger.logger = logger
DevopsConfig.read
DevopsService.init
end
# outputs to file
2015-11-03 12:05:07 +03:00
def call_async()
2018-04-04 22:44:39 +03:00
DevopsLogger.logger = logger
# initialize_devops()
2015-09-01 16:31:31 +03:00
dir = DevopsConfig[:report_dir_v2]
# directory is created on server start in config.ru
@file = File.join(dir, jid)
2015-09-01 16:31:31 +03:00
2018-04-04 22:44:39 +03:00
cnt = 0
begin
@task = Devops::Model::JobTask.find(jid)
rescue Mongoid::Errors::DocumentNotFound
sleep(1)
cnt += 1
if cnt == 5
logger.error("Can not find report with id '#{jid}'")
return
end
retry
end
2015-11-03 12:05:07 +03:00
update_job_status(STATUS::INIT, nil)
File.open(@file, "w") do |out|
2018-04-04 22:44:39 +03:00
set_task_data({"file" => @file})
2014-12-04 13:07:25 +03:00
begin
2015-11-03 12:05:07 +03:00
update_job_status(STATUS::RUNNING, nil)
2015-09-11 12:05:13 +03:00
self.out = out
2015-09-01 16:31:31 +03:00
2018-04-04 22:44:39 +03:00
job_result = yield(out, @task)
2015-09-01 16:31:31 +03:00
canonical_status = (job_result == 0 ? STATUS::COMPLETED : STATUS::FAILED)
2015-11-03 12:05:07 +03:00
update_job_status(canonical_status, job_result)
2018-04-04 22:44:39 +03:00
rescue StandardError, Devops::Exception::RecordNotFound => e
2015-09-01 16:31:31 +03:00
out << "\n #{e.class}\n #{e.message}\n"
2014-12-04 13:07:25 +03:00
out << e.backtrace.join("\n")
2015-11-03 12:05:07 +03:00
update_job_status(STATUS::FAILED, -100)
2014-12-04 13:07:25 +03:00
end
end
2018-04-04 22:44:39 +03:00
ensure
Sidekiq.redis {|con| con.hdel "devops", jid}
end
def set_task_data data
@task.update_attributes(data)
2014-12-04 13:07:25 +03:00
end
2015-09-01 16:31:31 +03:00
# outputs to STDOUT
2015-11-03 12:05:07 +03:00
def call_sync()
2015-09-01 16:31:31 +03:00
out = STDOUT
begin
2015-11-03 12:05:07 +03:00
yield(out, '')
2018-04-04 22:44:39 +03:00
rescue StandardError, Devops::Exception::RecordNotFound => e
2015-09-01 16:31:31 +03:00
out << "\n"
out << e.message
out << "\n"
out << e.backtrace.join("\n")
end
end
2015-11-03 12:05:07 +03:00
def update_job_status(status, job_result_code)
2018-04-04 22:44:39 +03:00
# Worker.set_status(jid, status)
@task.update_attributes(status: status, job_result_code: job_result_code)
2015-09-01 16:31:31 +03:00
status
end
2014-10-22 15:01:55 +04:00
end