root = File.join(File.dirname(__FILE__), "..") $LOAD_PATH.push root unless $LOAD_PATH.include? root require 'tilt/erubis' require "sidekiq" require "sidekiq/api" require "fileutils" require "lib/knife/knife_factory" require "lib/puts_and_flush" require "db/mongo/models/job_task" require "db/mongo/models/server" require "lib/executors/server_executor" # All options keys MUST be a symbol!!! class Worker include Sidekiq::Worker include PutsAndFlush # sidekiq_options :queue => :cid, :retry => false sidekiq_options :retry => false attr_accessor :out module STATUS INIT = "init" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" IN_QUEUE = "queued" end def self.start_async(worker_class, job_options) jid = worker_class.perform_async(job_options.dup) Devops::Model::JobTask.create({ "_id" => jid, "status" => Worker::STATUS::IN_QUEUE }) DevopsLogger.logger.info "Job '#{jid}' has been queued" jid end def self.start_sync(worker_class, job_options, out) stringified_options = job_options =begin job_options.each do |key, value| stringified_options[key.to_s] = value end =end w = worker_class.new w.out = out w.perform(stringified_options) end def self.set_status id, status Sidekiq.redis {|con| con.hset "devops", id, status} end def call &block if jid call_async(&block) else call_sync(&block) end rescue StandardError => e DevopsLogger.logger.error "#{e.message}:\n#{e.backtrace.join("\n")}" end private def initialize_devops() DevopsLogger.logger = logger DevopsConfig.read DevopsService.init end # outputs to file def call_async() DevopsLogger.logger = logger # initialize_devops() dir = DevopsConfig[:report_dir_v2] # directory is created on server start in config.ru @file = File.join(dir, jid) cnt = 0 begin @task = Devops::Model::JobTask.find(jid) rescue Mongoid::Errors::DocumentNotFound sleep(1) cnt += 1 if cnt == 5 logger.error("Can not find report with id '#{jid}'") return end retry end update_job_status(STATUS::INIT, nil) File.open(@file, "w") do |out| set_task_data({"file" => @file}) begin update_job_status(STATUS::RUNNING, nil) self.out = out job_result = yield(out, @task) canonical_status = (job_result == 0 ? STATUS::COMPLETED : STATUS::FAILED) update_job_status(canonical_status, job_result) rescue StandardError, Devops::Exception::RecordNotFound => e out << "\n #{e.class}\n #{e.message}\n" out << e.backtrace.join("\n") update_job_status(STATUS::FAILED, -100) end end ensure Sidekiq.redis {|con| con.hdel "devops", jid} end def set_task_data data @task.update_attributes(data) end # outputs to STDOUT def call_sync() out = STDOUT begin yield(out, '') rescue StandardError, Devops::Exception::RecordNotFound => e out << "\n" out << e.message out << "\n" out << e.backtrace.join("\n") end end def update_job_status(status, job_result_code) # Worker.set_status(jid, status) @task.update_attributes(status: status, job_result_code: job_result_code) status end end