refactore job starter
This commit is contained in:
parent
f805ea6bce
commit
f02b905716
@ -11,7 +11,8 @@ module Devops
|
||||
set_parser Devops::API2_0::Parser::KeyParser
|
||||
|
||||
def keys
|
||||
Devops::Db.connector.keys({}, {path: false})
|
||||
# Devops::Db.connector.keys({}, {path: false})
|
||||
Devops::Db.connector.keys({})
|
||||
end
|
||||
|
||||
def create
|
||||
|
||||
@ -18,20 +18,6 @@ module Devops
|
||||
Devops::Db.connector.stacks(provider)
|
||||
end
|
||||
|
||||
=begin
|
||||
def create_stack object
|
||||
object[:owner] = owner_from_request
|
||||
file = JobStarter.start_job(:worker, :stack_bootstrap,
|
||||
provider: object['provider'],
|
||||
stack_attributes: object,
|
||||
request: @request
|
||||
)
|
||||
puts "Syncing report is located here: #{file}"
|
||||
|
||||
stack
|
||||
end
|
||||
=end
|
||||
|
||||
def create_stack
|
||||
object = parser.create
|
||||
project = Devops::Db.connector.project(object["project"])
|
||||
@ -40,10 +26,9 @@ module Devops
|
||||
object["stack_template"] = env.stack_template
|
||||
object["owner"] = parser.current_user
|
||||
|
||||
file = JobStarter.start_job(:worker, :stack_bootstrap,
|
||||
provider: env.provider,
|
||||
stack_attributes: object,
|
||||
request: @request
|
||||
file = Worker.start_async(StackBootstrapWorker, @request,
|
||||
provider_name: env.provider,
|
||||
stack_attributes: object
|
||||
)
|
||||
puts "Syncing report is located here: #{file}"
|
||||
[file]
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
require 'json'
|
||||
require 'lib/stack_presets/factory'
|
||||
require 'workers/stack_bootstrap_worker'
|
||||
require 'workers/job_starter'
|
||||
require "app/api2/parsers/stack_preset"
|
||||
require_relative "request_handler"
|
||||
|
||||
|
||||
@ -1,9 +0,0 @@
|
||||
module CommandsStorage
|
||||
def self.add_job_lambda(job_with_lambda)
|
||||
job_lambdas.merge!(job_with_lambda)
|
||||
end
|
||||
|
||||
def self.job_lambdas
|
||||
@job_lambdas ||= {}
|
||||
end
|
||||
end
|
||||
@ -1,9 +1,7 @@
|
||||
require 'commands/commands_storage'
|
||||
|
||||
module StackCommands
|
||||
extend self
|
||||
|
||||
def sync_bootstrap_proc
|
||||
def sync_stack_proc
|
||||
lambda do |out, stack, mongo|
|
||||
# two tries each 4 seconds, then 5 tries each 10 seconds, then 10 tries each 30 seconds.
|
||||
sleep_times = [4]*2 + [10]*5 + [30]*10
|
||||
@ -47,8 +45,4 @@ module StackCommands
|
||||
end
|
||||
=end
|
||||
|
||||
CommandsStorage.add_job_lambda(
|
||||
sync_bootstrap: sync_bootstrap_proc
|
||||
)
|
||||
|
||||
end
|
||||
|
||||
@ -21,10 +21,10 @@ unless File.exists?(config[:devops_dir])
|
||||
puts "Directory '#{config[:devops_dir]}' has been created"
|
||||
end
|
||||
|
||||
config[:report_dir_v2] = File.expand_path(File.join(config[:devops_dir], "report", "v2")) unless config[:report_dir_v2]
|
||||
[
|
||||
:report_dir_v2
|
||||
].each {|key| d = config[key]; FileUtils.mkdir_p(d) unless File.exists?(d) }
|
||||
[:report_dir_v2].each do |key|
|
||||
directory = config[key]
|
||||
FileUtils.mkdir_p(directory) unless File.exists?(directory)
|
||||
end
|
||||
|
||||
DevopsService.init
|
||||
puts Devops::Routes.routes
|
||||
|
||||
@ -27,6 +27,8 @@ class DevopsConfig
|
||||
addr.ip_address
|
||||
end
|
||||
|
||||
config[:report_dir_v2] = File.expand_path(File.join(config[:devops_dir], "report", "v2"))
|
||||
|
||||
DevopsConfig.config = config
|
||||
end
|
||||
|
||||
|
||||
@ -199,7 +199,7 @@ module Devops
|
||||
# @out << roll_back
|
||||
# mongo.server_delete @server.id
|
||||
end
|
||||
return status
|
||||
status
|
||||
end
|
||||
|
||||
def check_server
|
||||
|
||||
@ -1,27 +0,0 @@
|
||||
require 'commands/commands_storage'
|
||||
require 'workers/workers_storage'
|
||||
|
||||
module JobStarter
|
||||
def self.start_job(strategy, job_name, job_options)
|
||||
case strategy
|
||||
when :worker
|
||||
start_job_as_worker(WorkersStorage.workers[job_name], job_options)
|
||||
end
|
||||
end
|
||||
|
||||
def self.start_job_as_worker(worker_class, options)
|
||||
job_options = options.dup
|
||||
job_options[:config] ||= DevopsConfig.config
|
||||
job_options[:dir] ||= DevopsConfig[:report_dir_v2]
|
||||
job_options[:url] ||= options[:request].url
|
||||
|
||||
jid = worker_class.perform_async(job_options)
|
||||
Worker.set_status jid, Worker::STATUS::IN_QUEUE
|
||||
DevopsLogger.logger.info "Job '#{jid}' has been started"
|
||||
|
||||
|
||||
uri = URI.parse(job_options[:url])
|
||||
uri.path = "#{job_options[:config][:url_prefix]}/v2.0/report/#{jid}"
|
||||
uri.to_s
|
||||
end
|
||||
end
|
||||
@ -7,46 +7,50 @@ require "db/mongo/models/report"
|
||||
class StackBootstrapWorker < Worker
|
||||
include StackCommands
|
||||
|
||||
# besides options came from JobStarter we need:
|
||||
# :provider
|
||||
# :stack_id
|
||||
def perform(options)
|
||||
call(options['config'], options['provider'], options['dir']) do |provider, out, file|
|
||||
attrs = options['stack_attributes']
|
||||
mongo = ::Devops::Db.connector
|
||||
report = ::Devops::Model::Report.new(
|
||||
"file" => file,
|
||||
"_id" => jid,
|
||||
"created_by" => attrs['owner'],
|
||||
"project" => attrs["project"],
|
||||
"deploy_env" => attrs["deploy_env"],
|
||||
"type" => ::Devops::Model::Report::STACK_TYPE
|
||||
)
|
||||
mongo.save_report(report)
|
||||
provider_name = options.fetch('provider_name')
|
||||
stack_attrs = options.fetch('stack_attributes')
|
||||
|
||||
stack = Devops::Model::StackFactory.create(options['provider'], options['stack_attributes'], out)
|
||||
# stack.owner = attrs['owner']
|
||||
call(provider_name) do |provider, out, file|
|
||||
without_bootstrap = stack_attrs.delete('without_bootstrap')
|
||||
save_report(file, stack_attrs)
|
||||
|
||||
stack = Devops::Model::StackFactory.create(provider_name, stack_attrs, out)
|
||||
mongo.stack_insert(stack)
|
||||
|
||||
r = sync_bootstrap_proc.call(out, stack, mongo)
|
||||
if r == 0
|
||||
operation_result = sync_stack_proc.call(out, stack, mongo)
|
||||
if operation_result == 0
|
||||
out << "\nStack '#{stack.name}' has been created\n"
|
||||
out.flush
|
||||
servers = persist_stack_servers!(stack, provider)
|
||||
out << "\n"
|
||||
unless attrs['without_bootstrap']
|
||||
statuses = servers.map do |s|
|
||||
Devops::Executor::ServerExecutor.new(s, out).two_phase_bootstrap({})
|
||||
end
|
||||
end
|
||||
bootstrap_servers(servers, out) unless without_bootstrap
|
||||
end
|
||||
operation_result
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def bootstrap_servers(servers, out)
|
||||
out << "\n"
|
||||
servers.map do |server|
|
||||
Devops::Executor::ServerExecutor.new(server, out).two_phase_bootstrap({})
|
||||
end
|
||||
end
|
||||
|
||||
def save_report(file, stack_attrs)
|
||||
report = ::Devops::Model::Report.new(
|
||||
"file" => file,
|
||||
"_id" => jid,
|
||||
"created_by" => stack_attrs['owner'],
|
||||
"project" => stack_attrs["project"],
|
||||
"deploy_env" => stack_attrs["deploy_env"],
|
||||
"type" => ::Devops::Model::Report::STACK_TYPE
|
||||
)
|
||||
mongo.save_report(report)
|
||||
end
|
||||
|
||||
def persist_stack_servers!(stack, provider)
|
||||
mongo = ::Devops::Db.connector
|
||||
project = mongo.project(stack.project)
|
||||
deploy_env = project.deploy_env(stack.deploy_env)
|
||||
|
||||
@ -74,5 +78,3 @@ class StackBootstrapWorker < Worker
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
WorkersStorage.add_worker(stack_bootstrap: StackBootstrapWorker)
|
||||
|
||||
@ -11,7 +11,6 @@ require "core/devops-config"
|
||||
require "core/devops-logger"
|
||||
require "core/devops-db"
|
||||
require "providers/provider_factory"
|
||||
require 'workers/workers_storage'
|
||||
require "lib/knife/knife_factory"
|
||||
|
||||
class Worker
|
||||
@ -25,6 +24,95 @@ class Worker
|
||||
IN_QUEUE = "queued"
|
||||
end
|
||||
|
||||
def self.start_async(worker_class, request, job_options)
|
||||
jid = worker_class.perform_async(job_options.dup)
|
||||
Worker.set_status jid, Worker::STATUS::IN_QUEUE
|
||||
DevopsLogger.logger.info "Job '#{jid}' has been started"
|
||||
|
||||
uri = URI.parse(request.url)
|
||||
uri.path = "#{DevopsConfig[:url_prefix]}/v2.0/report/#{jid}"
|
||||
uri.to_s
|
||||
end
|
||||
|
||||
def self.set_status id, status
|
||||
Sidekiq.redis {|con| con.hset "devops", id, status}
|
||||
end
|
||||
|
||||
def call provider_name, &block
|
||||
begin
|
||||
initialize_devops(provider_name)
|
||||
provider = ::Provider::ProviderFactory.get(provider_name) if provider_name
|
||||
if jid
|
||||
call_async(provider, &block)
|
||||
else
|
||||
call_sync(provider, &block)
|
||||
end
|
||||
rescue StandardError => e
|
||||
puts e.message
|
||||
puts e.backtrace.join("\n")
|
||||
DevopsLogger.logger.error e.message
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def initialize_devops(provider_name)
|
||||
DevopsLogger.logger = logger
|
||||
DevopsConfig.read
|
||||
DevopsService.init
|
||||
::Provider::ProviderFactory.init(DevopsConfig.config)
|
||||
end
|
||||
|
||||
# outputs to file
|
||||
def call_async(provider)
|
||||
dir = DevopsConfig[:report_dir_v2]
|
||||
# directory is created on server start in config.ru
|
||||
file = File.join(dir, jid)
|
||||
|
||||
update_job_status(STATUS::INIT)
|
||||
File.open(file, "w") do |out|
|
||||
begin
|
||||
update_job_status(STATUS::RUNNING)
|
||||
|
||||
job_result = yield(provider, out, file)
|
||||
canonical_status = (job_result == 0 ? STATUS::COMPLETED : STATUS::FAILED)
|
||||
update_job_status(canonical_status)
|
||||
rescue StandardError, RecordNotFound => e
|
||||
out << "\n #{e.class}\n #{e.message}\n"
|
||||
out << e.backtrace.join("\n")
|
||||
update_job_status(STATUS::FAILED)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# outputs to STDOUT
|
||||
def call_sync
|
||||
out = STDOUT
|
||||
begin
|
||||
yield(provider, out, '')
|
||||
rescue StandardError, RecordNotFound => e
|
||||
out << "\n"
|
||||
out << e.message
|
||||
out << "\n"
|
||||
out << e.backtrace.join("\n")
|
||||
end
|
||||
end
|
||||
|
||||
def mongo
|
||||
::Devops::Db.connector
|
||||
end
|
||||
|
||||
def update_job_status(status)
|
||||
set_status(jid, status)
|
||||
mongo.set_report_status(jid, status)
|
||||
status
|
||||
end
|
||||
|
||||
def init_provider(provider_name)
|
||||
::Provider::ProviderFactory.init(DevopsConfig.config)
|
||||
::Provider::ProviderFactory.get(provider_name) if provider_name
|
||||
end
|
||||
|
||||
def convert_config conf
|
||||
config = {}
|
||||
conf.each {|k,v| config[k.is_a?(String) ? k.to_sym : k] = v}
|
||||
@ -33,54 +121,7 @@ class Worker
|
||||
end
|
||||
|
||||
def set_status id, status
|
||||
Sidekiq.redis {|con| con.hset "devops", id, status}
|
||||
end
|
||||
|
||||
# it is called from creating server handler. But maybe we can somehow refactore code
|
||||
# to get rid of duplication?
|
||||
# TODO: check it
|
||||
def self.set_status id, status
|
||||
Sidekiq.redis {|con| con.hset "devops", id, status}
|
||||
end
|
||||
|
||||
def call conf, e_provider, dir
|
||||
DevopsLogger.logger = logger
|
||||
FileUtils.mkdir_p(dir) unless File.exists?(dir)
|
||||
set_status jid, "init"
|
||||
DevopsConfig.config = convert_config(conf)
|
||||
DevopsService.init
|
||||
file = File.join(dir, jid)
|
||||
error = nil
|
||||
provider = nil
|
||||
begin
|
||||
::Provider::ProviderFactory.init(DevopsConfig.config)
|
||||
unless e_provider.nil?
|
||||
provider = ::Provider::ProviderFactory.get(e_provider)
|
||||
end
|
||||
rescue Exception => e
|
||||
error = e
|
||||
DevopsLogger.logger.error e.message
|
||||
return
|
||||
end
|
||||
mongo = ::Devops::Db.connector
|
||||
File.open(file, "w") do |out|
|
||||
begin
|
||||
set_status jid, STATUS::RUNNING
|
||||
raise error unless error.nil?
|
||||
status = yield(provider, out, file)
|
||||
status = (status == 0 ? STATUS::COMPLETED : STATUS::FAILED)
|
||||
set_status jid, status
|
||||
mongo.set_report_status(jid, status)
|
||||
status
|
||||
rescue Exception => e
|
||||
out << "\n"
|
||||
out << e.message
|
||||
out << "\n"
|
||||
out << e.backtrace.join("\n")
|
||||
set_status jid, STATUS::FAILED
|
||||
mongo.set_report_status(jid, STATUS::FAILED) unless mongo.nil?
|
||||
end
|
||||
end
|
||||
self.class.set_status(id, status)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
@ -1,9 +0,0 @@
|
||||
module WorkersStorage
|
||||
def self.add_worker(job_with_worker)
|
||||
workers.merge!(job_with_worker)
|
||||
end
|
||||
|
||||
def self.workers
|
||||
@job_workers ||= {}
|
||||
end
|
||||
end
|
||||
Loading…
Reference in New Issue
Block a user