CID-472: lock persisting before deleting stale servers, too
This commit is contained in:
		
							parent
							
								
									8b379bc2a2
								
							
						
					
					
						commit
						0e299149bf
					
				| @ -15,7 +15,7 @@ module Devops | |||||||
|       def initialize(options) |       def initialize(options) | ||||||
|         @out = options.fetch(:out) |         @out = options.fetch(:out) | ||||||
|         @stack = options[:stack] |         @stack = options[:stack] | ||||||
|         self.just_persisted_by_priority = {} |         @just_persisted_by_priority = {} | ||||||
|       end |       end | ||||||
| 
 | 
 | ||||||
|       def create_stack(stack_attrs) |       def create_stack(stack_attrs) | ||||||
| @ -31,33 +31,45 @@ module Devops | |||||||
|       end |       end | ||||||
| 
 | 
 | ||||||
|       def persist_new_servers |       def persist_new_servers | ||||||
|         reload_stack |         fetcher.reset_states | ||||||
|         raise 'It seems that stack is synchronizing at the moment' if stack.persisting_is_locked |         servers_to_persist = fetcher.servers_to_persist | ||||||
|         begin |         if servers_to_persist.any? | ||||||
|           stack.lock_persisting! |           out.puts 'Servers to persist:' | ||||||
|           fetcher.new_servers_by_priorities.each do |priority, provider_infos| |           servers_to_persist.each { |info| out.puts JSON.pretty_generate(info) } | ||||||
|             servers = provider_infos.map {|info| persister.persist(info) } |           puts_and_flush "\n\nPersisting new servers:" | ||||||
|             just_persisted_by_priority[priority] = servers |         else | ||||||
|  |           puts_and_flush 'There are no servers to persist.' | ||||||
|  |           return | ||||||
|         end |         end | ||||||
| 
 | 
 | ||||||
|           just_persisted_by_priority.values.flatten.each do |server| |         fetcher.new_servers_by_priorities.each do |priority, provider_infos| | ||||||
|             puts_and_flush "\n\nPersisted server #{server.id}: #{JSON.pretty_generate(server.to_hash)}" |           servers = provider_infos.map {|info| persister.persist(info) } | ||||||
|           end |           @just_persisted_by_priority[priority] = servers | ||||||
|         ensure |  | ||||||
|           stack.unlock_persisting! |  | ||||||
|         end |         end | ||||||
|  |         puts_and_flush "New servers have been persisted" | ||||||
|       end |       end | ||||||
| 
 | 
 | ||||||
|       def bootstrap_just_persisted(jid) |       def bootstrap_just_persisted(jid) | ||||||
|         puts_and_flush "Bootstrapping just persisted servers" if just_persisted_by_priority.values.flatten.any? |         puts_and_flush "Bootstrapping just persisted servers" if @just_persisted_by_priority.values.flatten.any? | ||||||
|         just_persisted_by_priority.each do |priority, servers| |         @just_persisted_by_priority.each do |priority, servers| | ||||||
|           puts_and_flush "Servers with priority '#{priority}': #{servers.map(&:id).join(", ")}" |           puts_and_flush "Servers with priority '#{priority}': #{servers.map(&:id).join(", ")}" | ||||||
|         end |         end | ||||||
|         PrioritizedGroupsBootstrapper.new(out, jid, just_persisted_by_priority).bootstrap_servers_by_priority |         PrioritizedGroupsBootstrapper.new(out, jid, @just_persisted_by_priority).bootstrap_servers_by_priority | ||||||
|       end |       end | ||||||
| 
 | 
 | ||||||
|       def delete_stale_servers |       def delete_stale_servers | ||||||
|         fetcher.stale_servers.each do |server| |         fetcher.reset_states | ||||||
|  |         servers_to_delete = fetcher.stale_servers | ||||||
|  |         if servers_to_delete.any? | ||||||
|  |           out.puts "\nStale servers:" | ||||||
|  |           servers_to_delete.each { |server| out.puts JSON.pretty_generate(server.to_hash) } | ||||||
|  |           puts_and_flush 'Deleting stale servers.' | ||||||
|  |         else | ||||||
|  |           puts_and_flush "\nThere are no stale servers." | ||||||
|  |           return | ||||||
|  |         end | ||||||
|  | 
 | ||||||
|  |         servers_to_delete.each do |server| | ||||||
|           proper_remove_server(server) |           proper_remove_server(server) | ||||||
|         end |         end | ||||||
|       end |       end | ||||||
| @ -76,8 +88,6 @@ module Devops | |||||||
| 
 | 
 | ||||||
|       private |       private | ||||||
| 
 | 
 | ||||||
|       attr_accessor :just_persisted_by_priority |  | ||||||
| 
 |  | ||||||
|       def mongo |       def mongo | ||||||
|         Devops::Db.connector |         Devops::Db.connector | ||||||
|       end |       end | ||||||
| @ -91,10 +101,6 @@ module Devops | |||||||
|         end |         end | ||||||
|       end |       end | ||||||
| 
 | 
 | ||||||
|       def reload_stack |  | ||||||
|         @stack = mongo.stack(@stack.name) |  | ||||||
|       end |  | ||||||
| 
 |  | ||||||
|       def fetcher |       def fetcher | ||||||
|         @fetcher ||= StackServersFetcher.new(stack, out) |         @fetcher ||= StackServersFetcher.new(stack, out) | ||||||
|       end |       end | ||||||
|  | |||||||
| @ -36,23 +36,7 @@ class Devops::Executor::StackExecutor | |||||||
|       @by_state[STALE] |       @by_state[STALE] | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     private |     def set_states | ||||||
| 
 |  | ||||||
|     def fetch |  | ||||||
|       @servers_info = @stack.provider_instance.stack_servers(@stack) |  | ||||||
|       divide_into_states |  | ||||||
|       output_fetched_servers |  | ||||||
|     end |  | ||||||
| 
 |  | ||||||
|     def priority_from_info(provider_info) |  | ||||||
|       if provider_info['tags'] |  | ||||||
|         priority = provider_info['tags']['cid:priority'].to_i |  | ||||||
|       else |  | ||||||
|         0 |  | ||||||
|       end |  | ||||||
|     end |  | ||||||
| 
 |  | ||||||
|     def divide_into_states |  | ||||||
|       persisted = Devops::Db.connector.stack_servers(@stack.name) |       persisted = Devops::Db.connector.stack_servers(@stack.name) | ||||||
|       persisted_ids = persisted.map(&:id) |       persisted_ids = persisted.map(&:id) | ||||||
|       in_cloud_ids = @servers_info.map {|info| info['id']} |       in_cloud_ids = @servers_info.map {|info| info['id']} | ||||||
| @ -69,29 +53,21 @@ class Devops::Executor::StackExecutor | |||||||
| 
 | 
 | ||||||
|       @by_state[STALE] = persisted.select { |server| deleted_ids.include?(server.id) } |       @by_state[STALE] = persisted.select { |server| deleted_ids.include?(server.id) } | ||||||
|     end |     end | ||||||
|  |     alias_method :reset_states, :set_states | ||||||
| 
 | 
 | ||||||
|     # Do not move it to stack executor till set events handling properly. |     private | ||||||
|     # For now there may be already persisted servers on creation because of too early events from aws. | 
 | ||||||
|     def output_fetched_servers |     def fetch | ||||||
|       if servers_to_persist.any? |       @servers_info = @stack.provider_instance.stack_servers(@stack) | ||||||
|         out.puts 'Servers to persist:' |       reset_states | ||||||
|         servers_to_persist.each { |info| out.puts JSON.pretty_generate(info) } |     end | ||||||
|  | 
 | ||||||
|  |     def priority_from_info(provider_info) | ||||||
|  |       if provider_info['tags'] | ||||||
|  |         priority = provider_info['tags']['cid:priority'].to_i | ||||||
|       else |       else | ||||||
|         out.puts 'There are no servers to persist.' |         0 | ||||||
|       end |       end | ||||||
| 
 |  | ||||||
|       if already_persisted_servers.any? |  | ||||||
|         out.puts "\nAlready persisted servers:" |  | ||||||
|         already_persisted_servers.each { |info| out.puts JSON.pretty_generate(info) } |  | ||||||
|       end |  | ||||||
| 
 |  | ||||||
|       if stale_servers.any? |  | ||||||
|         out.puts "\nStale servers:" |  | ||||||
|         stale_servers.each { |server| out.puts JSON.pretty_generate(server.to_hash) } |  | ||||||
|       else |  | ||||||
|         out.puts "\nThere are no stale servers." |  | ||||||
|       end |  | ||||||
|       out.flush |  | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
| end | end | ||||||
| @ -8,26 +8,26 @@ class Devops::Executor::StackExecutor | |||||||
|     let(:executor_with_stack) { described_class.new(out: out, stack: stack) } |     let(:executor_with_stack) { described_class.new(out: out, stack: stack) } | ||||||
|     let(:new_servers_by_priorities) { |     let(:new_servers_by_priorities) { | ||||||
|       { |       { | ||||||
|         0 => [{id: 1}], |         0 => [{'id' => 1}], | ||||||
|         2 => [{id: 2}] |         2 => [{'id' => 2}] | ||||||
|       } |  | ||||||
|     } |  | ||||||
|     let(:just_persisted_by_priority) { |  | ||||||
|       { |  | ||||||
|         0 => [double('info 1', id: 1)], |  | ||||||
|         2 => [double('info 2', id: 2)] |  | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|     let(:fetcher) { |     let(:fetcher) { | ||||||
|       instance_double(StackServersFetcher, |       instance_double(StackServersFetcher, | ||||||
|  |         servers_to_persist: new_servers_by_priorities.values.flatten, | ||||||
|         new_servers_by_priorities: new_servers_by_priorities, |         new_servers_by_priorities: new_servers_by_priorities, | ||||||
|         stale_servers: build_list(:server, 2), |         stale_servers: build_list(:server, 2), | ||||||
|         fetch: nil |         fetch: nil, | ||||||
|  |         reset_states: nil | ||||||
|       ) |       ) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     let(:persister) { instance_double(StackServersPersister) } | ||||||
|  | 
 | ||||||
|     before do |     before do | ||||||
|       allow(executor_with_stack).to receive(:fetcher) { fetcher } |       allow(executor_with_stack).to receive(:fetcher) { fetcher } | ||||||
|  |       allow(persister).to receive(:persist) { |info| build(:server, id: info['id']) } | ||||||
|  |       allow(executor_with_stack).to receive(:persister) { persister } | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @ -72,12 +72,7 @@ class Devops::Executor::StackExecutor | |||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     describe '#persist_new_servers' do |     describe '#persist_new_servers' do | ||||||
|       let(:persister) { instance_double(StackServersPersister, persist: build(:server)) } |  | ||||||
| 
 |  | ||||||
|       before do |       before do | ||||||
|         allow(executor_with_stack).to receive(:persister) { persister } |  | ||||||
|         allow(stack).to receive(:lock_persisting!) |  | ||||||
|         allow(stack).to receive(:unlock_persisting!) |  | ||||||
|         allow(stubbed_connector).to receive(:stack) { stack } |         allow(stubbed_connector).to receive(:stack) { stack } | ||||||
|       end |       end | ||||||
| 
 | 
 | ||||||
| @ -85,28 +80,13 @@ class Devops::Executor::StackExecutor | |||||||
|         expect(persister).to receive(:persist).exactly(2).times |         expect(persister).to receive(:persist).exactly(2).times | ||||||
|         executor_with_stack.persist_new_servers |         executor_with_stack.persist_new_servers | ||||||
|       end |       end | ||||||
| 
 |  | ||||||
|       it 'locks persisting of a stack before start and unlocks after finish' do |  | ||||||
|         expect(stack).to receive(:lock_persisting!).ordered |  | ||||||
|         expect(persister).to receive(:persist).ordered |  | ||||||
|         expect(stack).to receive(:unlock_persisting!).ordered |  | ||||||
|         executor_with_stack.persist_new_servers |  | ||||||
|       end |  | ||||||
| 
 |  | ||||||
|       it 'unlocks persisting even in case of failures' do |  | ||||||
|         allow(persister).to receive(:persist) { raise } |  | ||||||
|         expect(stack).to receive(:unlock_persisting!) |  | ||||||
|         expect { executor_with_stack.persist_new_servers }.to raise_error StandardError |  | ||||||
|       end |  | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     describe '#bootstrap_just_persisted' do |     describe '#bootstrap_just_persisted' do | ||||||
|       it 'calls PrioritizedGroupsBootstrapper#bootstrap_servers_by_priority' do |       it 'calls PrioritizedGroupsBootstrapper#bootstrap_servers_by_priority' do | ||||||
|         result = double('bootstrap_result') |  | ||||||
|         allow(executor_with_stack).to receive(:just_persisted_by_priority) { just_persisted_by_priority } |  | ||||||
|         allow_any_instance_of(PrioritizedGroupsBootstrapper).to receive(:bootstrap_servers_by_priority) { result } |  | ||||||
|         expect_any_instance_of(PrioritizedGroupsBootstrapper).to receive(:bootstrap_servers_by_priority) |         expect_any_instance_of(PrioritizedGroupsBootstrapper).to receive(:bootstrap_servers_by_priority) | ||||||
|         expect(executor_with_stack.bootstrap_just_persisted(1000)).to eq result |         executor_with_stack.persist_new_servers | ||||||
|  |         executor_with_stack.bootstrap_just_persisted(1000) | ||||||
|       end |       end | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										57
									
								
								devops-service/spec/workers/stack_sync_worker_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										57
									
								
								devops-service/spec/workers/stack_sync_worker_spec.rb
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,57 @@ | |||||||
|  | require 'workers/stack_sync_worker' | ||||||
|  | require 'lib/executors/stack_executor' | ||||||
|  | 
 | ||||||
|  | RSpec.describe StackSyncWorker, type: :worker, stubbed_connector: true, init_messages: true do | ||||||
|  |   let(:worker) { described_class.new } | ||||||
|  |   let(:stack) { build(:stack) } | ||||||
|  |   let(:perform) { worker.perform('stack_name' => 'stack') } | ||||||
|  |   let(:executor) { | ||||||
|  |     instance_double(Devops::Executor::StackExecutor, | ||||||
|  |       delete_stale_servers: nil, | ||||||
|  |       persist_new_servers: nil, | ||||||
|  |       bootstrap_just_persisted: double('result', reason: 'ok', code: 0) | ||||||
|  |     ) | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   before do | ||||||
|  |     allow(worker).to receive(:update_report) | ||||||
|  |     allow(worker).to receive(:executor) { executor } | ||||||
|  |     allow(worker).to receive(:sleep) | ||||||
|  |     allow(stubbed_connector).to receive(:stack) { stack } | ||||||
|  |     allow(stubbed_connector).to receive(:lock_persisting_stack) | ||||||
|  |     allow(stubbed_connector).to receive(:unlock_persisting_stack) | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   it 'saves report with owner set to SYSTEM' do | ||||||
|  |     expect(worker).to receive(:update_report) do |options| | ||||||
|  |       expect(options).to include('created_by' => 'SYSTEM') | ||||||
|  |     end | ||||||
|  |     perform | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   it 'locks persisting of a stack before start and unlocks after finish' do | ||||||
|  |     expect(worker).to receive(:wait_until_stack_is_unlocked).ordered | ||||||
|  |     expect(stack).to receive(:lock_persisting!).ordered | ||||||
|  |     expect(executor).to receive(:delete_stale_servers).ordered | ||||||
|  |     expect(executor).to receive(:persist_new_servers).ordered | ||||||
|  |     expect(stack).to receive(:unlock_persisting!).ordered | ||||||
|  |     perform | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   it 'unlocks persisting even in case of failures' do | ||||||
|  |     allow(executor).to receive(:persist_new_servers) { raise } | ||||||
|  |     expect(stack).to receive(:unlock_persisting!) | ||||||
|  |     expect { perform }.to raise_error StandardError | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   it 'waites until stack is unlocked' do | ||||||
|  |     allow(stubbed_connector).to receive(:stack).and_return( | ||||||
|  |       build(:stack, persisting_is_locked: true), | ||||||
|  |       build(:stack, persisting_is_locked: true), | ||||||
|  |       build(:stack, persisting_is_locked: false) | ||||||
|  |     ) | ||||||
|  |     expect(worker).to receive(:sleep).exactly(2).times | ||||||
|  |     perform | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  | end | ||||||
| @ -60,12 +60,8 @@ class StackBootstrapWorker < Worker | |||||||
| 
 | 
 | ||||||
|   def rollback_stack! |   def rollback_stack! | ||||||
|     puts_and_flush "\nStart rollback of a stack" |     puts_and_flush "\nStart rollback of a stack" | ||||||
|     begin |  | ||||||
|     executor.delete_stack |     executor.delete_stack | ||||||
|     puts_and_flush "Stack rollback has been completed" |     puts_and_flush "Stack rollback has been completed" | ||||||
|     rescue StandardError |  | ||||||
|       puts_and_flush "Stack rollback failed" |  | ||||||
|     end |  | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def save_report(stack_attrs) |   def save_report(stack_attrs) | ||||||
|  | |||||||
| @ -2,7 +2,7 @@ require 'lib/executors/stack_executor' | |||||||
| 
 | 
 | ||||||
| class StackSyncWorker < Worker | class StackSyncWorker < Worker | ||||||
| 
 | 
 | ||||||
|   MAX_LOCK_WAITING_TIME = 15000 |   MAX_LOCK_WAITING_TIME = 6000 | ||||||
| 
 | 
 | ||||||
|   # @options: |   # @options: | ||||||
|   #   'stack_name'    required |   #   'stack_name'    required | ||||||
| @ -12,14 +12,16 @@ class StackSyncWorker < Worker | |||||||
|       stack_name = options.fetch('stack_name') |       stack_name = options.fetch('stack_name') | ||||||
|       created_by = options['created_by'] || ::Devops::Model::Report::SYSTEM_OWNER |       created_by = options['created_by'] || ::Devops::Model::Report::SYSTEM_OWNER | ||||||
|       @stack = mongo.stack(stack_name) |       @stack = mongo.stack(stack_name) | ||||||
| 
 |  | ||||||
|       save_report(created_by) |       save_report(created_by) | ||||||
|       wait_until_stack_is_unlocked |  | ||||||
|       puts_and_flush 'Persisting new servers.' |  | ||||||
|       executor.persist_new_servers |  | ||||||
| 
 | 
 | ||||||
|       puts_and_flush "\n\nDeleting stale servers." |       wait_until_stack_is_unlocked | ||||||
|  |       begin | ||||||
|  |         @stack.lock_persisting! | ||||||
|         executor.delete_stale_servers |         executor.delete_stale_servers | ||||||
|  |         executor.persist_new_servers | ||||||
|  |       ensure | ||||||
|  |         @stack.unlock_persisting! | ||||||
|  |       end | ||||||
| 
 | 
 | ||||||
|       puts_and_flush "\n\nBootstrapping just persisted servers." |       puts_and_flush "\n\nBootstrapping just persisted servers." | ||||||
|       bootstrap_result = executor.bootstrap_just_persisted(jid) |       bootstrap_result = executor.bootstrap_just_persisted(jid) | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 Anton Chuchkalov
						Anton Chuchkalov