Thank you @Bernhard_Suttner and @bastian-src for the replies! Ok, so I think I have resolved this. I believe it was solved simply by changing that validate from true to false, as mentioned in the above comments.
We are currently using salt 3005.1 (upgrading to 3006 soon). Yes, I do use a reactor to upload the reports. I have also slightly modified the python upload script (but mostly just to add logging), but also since we have over 60,000 hosts, I changed the file lock to use fcntl.flock
so that the OS queues up reports if it cant get a lock. I also refactored the foreman_salt/report_importer.rb
for better logging. Below is what they both look like.
I did discover that using the “create host from facts upload” to True, still causes duplicate hosts to be created. I looked into the fact uploader ruby file, and do see that there is no sort of checking for existing hosts. So it could be refactored to handle checks for hosts a bit better. But Ill be leaving that setting to False.
On the other hand, I can confidently say that I no longer am seeing duplicate hosts after my changes. The only direct change I made was changing the validate from false to true. I now have “create hosts from report” to True. I let it run all weekend and I dont see a single duplicate host.
Loading production environment (Rails 6.1.7.7)
irb(main):001:0> Host.group(:name).having('COUNT(*) > 1').count.each { |name, count| puts "#{name}: #{count} instances" }
=> {}
irb(main):002:0> puts Host.count
5656
=> nil
irb(main):003:0> Host.group(:name).having('COUNT(*) > 1').count
=> {}
When I had validate left to false, I saw hosts getting duplicate on report uploads. Now, with it set to false (I cant find the exact error in the logs) but it says something like “hosts already exists” during the attempt to create the host.
foreman_report_upload.py
# -*- coding: utf-8 -*-
'''
Uploads reports from the Salt job cache to Foreman
'''
from __future__ import absolute_import, print_function, unicode_literals # Import future statements for compatibility
# Define constants for file paths
LAST_UPLOADED = '/etc/salt/foreman/last_uploaded' # File to store the last uploaded job ID
FOREMAN_CONFIG = '/etc/salt/foreman.yaml' # Path to the Foreman configuration file
LOCK_FILE = '/etc/salt/foreman/salt-report-upload.lock' # Lock file to prevent concurrent uploads
# Try importing HTTP connection classes based on Python version
try:
from http.client import HTTPConnection, HTTPSConnection # Python 3.x
except ImportError:
from httplib import HTTPSConnection, HTTPSConnection # Python 2.x
import io
import ssl
import json
import yaml
import os
import sys
import base64
import fcntl
import time
# Import python libs
import logging
log = logging.getLogger(__name__) # Initialize logger
# Define 'unicode' for Python 3 compatibility
if sys.version_info.major == 3:
unicode = str
def salt_config():
"""Load and return the Foreman configuration from a YAML file."""
with open(FOREMAN_CONFIG, 'r') as f:
config = yaml.load(f.read())
return config
def write_last_uploaded(last_uploaded):
"""Write the last uploaded job ID to the LAST_UPLOADED file."""
with io.open(LAST_UPLOADED, 'w+') as f:
f.write(unicode(last_uploaded))
def upload(report):
"""Upload the report to the Foreman server."""
log.debug("Uploading report: %s", report)
config = salt_config() # Load configuration settings
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
# We are not using SSL because we use a load balancer that handles it.
# But we also need to point this to https because the load balancer expects it, but
# the underlying foreman servers themselves do not.
# Thus I am commenting out the https check, so we always use https without certs
#if config[":proto"] == "https":
# ctx = ssl.create_default_context()
# ctx.load_cert_chain(certfile=config[":ssl_cert"], keyfile=config[":ssl_key"])
# if config[":ssl_ca"]:
# ctx.load_verify_locations(cafile=config[":ssl_ca"])
# connection = HTTPSConnection(config[":host"], port=config[":port"], context=ctx)
#else:
connection = HTTPSConnection(config[":host"], port=config[":port"]) # Establish HTTPS connection
# Add basic authentication if username and password are provided
if ":username" in config and ":password" in config:
token = base64.b64encode(
"{}:{}".format(config[":username"], config[":password"]).encode("utf-8")
)
headers["Authorization"] = "Basic {}".format(token.decode("utf-8"))
# Send a POST request to the Foreman API with the report
connection.request("POST", "/salt/api/v2/jobs/upload", json.dumps(report), headers)
response = connection.getresponse() # Get the response from the server
if response.status == 200:
write_last_uploaded(report['job']['job_id']) # Update the last uploaded job ID
info_msg = 'Success {0}: {1}'.format(report['job']['job_id'], response.read())
log.info(info_msg) # Log success message
else:
log.error("Unable to upload job - aborting report upload") # Log error message
log.error(response.read()) # Log server response
def create_report(json_str):
"""Create a report dictionary from the JSON string."""
msg = json.loads(json_str) # Load the JSON string into a dictionary
if msg['fun'] in ['state.highstate', 'state.apply']:
# Handle 'state.highstate' and 'state.apply' functions
return {'job':
{
'result': {
msg['id']: msg['return'], # Map minion ID to its return data
},
'function': msg['fun'], # Function name
'job_id': msg['jid'] # Job ID
}
}
elif msg['fun'] == 'state.template_str':
# Handle 'state.template_str' function
for key, entry in msg['return'].items():
if key.startswith('module_') and entry['__id__'] == 'state.highstate':
return {'job':
{
'result': {
msg['id']: next(iter(entry['changes'].values())),
},
'function': 'state.highstate',
'job_id': msg['jid']
}
}
raise Exception('No state.highstate or state.apply found') # Raise exception if function is unsupported
def get_lock(minion_id, timeout=10):
"""Attempt to obtain a lock within a timeout period."""
lock_fd = os.open(LOCK_FILE, os.O_CREAT | os.O_RDWR)
start_time = time.time()
while True:
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return lock_fd
except BlockingIOError:
elapsed_time = time.time() - start_time
if elapsed_time >= timeout:
log.error("Unable to obtain lock after %s seconds for minion id %s.", timeout, minion_id)
os.close(lock_fd)
return None
log.error("Lock is held by another process, retrying... (minion id: %s)", minion_id)
time.sleep(1)
def release_lock(lock_fd):
"""Release the lock."""
fcntl.flock(lock_fd, fcntl.LOCK_UN)
os.close(lock_fd)
os.remove(LOCK_FILE)
def now(highstate):
log.debug('Upload highstate to Foreman')
lock_fd = None
try:
report = create_report(base64.b64decode(highstate))
# Extract minion ID from the report
minion_id = next(iter(report['job']['result']))
lock_fd = get_lock(minion_id) # Pass minion_id to get_lock
if lock_fd:
upload(report)
log.warning('Successfully uploaded report for minion id: %s', minion_id)
else:
log.error('Could not obtain lock, highstate not uploaded for minion id: %s', minion_id)
except Exception as exc:
log.error('Exception encountered for minion id %s: %s', minion_id, exc)
finally:
if lock_fd:
release_lock(lock_fd)
report_importer.py
[root@10-222-206-152 usr]# cat ./share/gems/gems/foreman_salt-16.0.2/app/services/foreman_salt/report_importer.rb
module ForemanSalt
class ReportImporter
delegate :logger, to: :Rails
attr_reader :report
# Define logger as a class method
def self.logger
Rails.logger
end
def self.import(raw, proxy_id = nil)
logger.info "Starting import with raw data: #{raw.inspect} and proxy_id: #{proxy_id}"
raise ::Foreman::Exception, _('Invalid report') unless raw.is_a?(Hash)
raw.map do |host, report|
logger.debug "Processing report for host: #{host}"
importer = ForemanSalt::ReportImporter.new(host, report, proxy_id)
importer.import
report = importer.report
report.origin = 'Salt'
report.save!
logger.info "Report saved for host: #{host}"
report
end
rescue => e
logger.error "Import failed: #{e.message}"
raise e
end
def initialize(host, raw, proxy_id = nil)
logger.debug "Initializing ReportImporter with host: #{host}, proxy_id: #{proxy_id}"
@host = find_or_create_host(host)
@raw = raw
@proxy_id = proxy_id
end
def import
logger.info "Processing report for #{@host}"
logger.debug { "Report raw data: #{@raw.inspect}" }
if @host.new_record? && !Setting[:create_new_host_when_report_is_uploaded]
logger.info("Skipping report for #{@host} as it's an unknown host and create_new_host_when_report_is_uploaded setting is disabled")
return ConfigReport.new
end
@host.salt_proxy_id ||= @proxy_id
@host.last_report = start_time
if [Array, String].include?(@raw.class)
logger.debug "Detected failures in raw data; processing failures"
process_failures # If Salt sends us only an array (or string), it's a list of fatal failures
else
logger.debug "Processing normal report data"
process_normal
end
if @host.save(validate: true)
logger.debug "Host #{@host.name} saved successfully"
else
logger.warn "Host #{@host.name} failed to save: #{@host.errors.full_messages.join(', ')}"
end
@host.reload
@host.refresh_statuses([HostStatus.find_status_by_humanized_name('configuration')])
duration = (Time.zone.now - start_time).round(2)
logger.info("Imported report for #{@host} in #{duration} seconds")
rescue => e
logger.error "Failed to import report for #{@host}: #{e.message}"
raise e
end
private
def find_or_create_host(host)
logger.debug "Finding or creating host: #{host}"
@host ||= Host::Managed.find_by(name: host)
unless @host
logger.info "Host not found; creating new host with name: #{host}"
new_host = Host::Managed.new(name: host)
if new_host.save(validate: true)
logger.debug "New host #{host} created successfully"
@host = new_host
else
logger.error "Failed to create host #{host}: #{new_host.errors.full_messages.join(', ')}"
raise ::Foreman::Exception, _('Failed to create host')
end
end
@host
rescue => e
logger.error "Error in find_or_create_host: #{e.message}"
raise e
end
def import_log_messages
logger.debug "Importing log messages"
@raw.each do |resource, result|
level = determine_log_level(result)
source_value = resource.to_s
source = Source.find_or_create_by(value: source_value)
logger.debug "Log source: #{source_value}"
message_value = extract_message(result)
message = Message.find_or_create_by(value: message_value)
logger.debug "Log message: #{message_value}"
Log.create(message_id: message.id, source_id: source.id, report: @report, level: level)
logger.debug "Log entry created with level #{level} for resource #{resource}"
end
rescue => e
logger.error "Error importing log messages: #{e.message}"
raise e
end
def calculate_metrics
logger.debug "Calculating metrics from raw data"
success = 0
failed = 0
changed = 0
restarted = 0
restarted_failed = 0
pending = 0
time = {}
@raw.each do |resource, result|
next unless result.is_a?(Hash)
logger.debug "Processing resource: #{resource}"
if result['result'] == true
success += 1
logger.debug "Resource #{resource} succeeded"
if resource.match(/^service_/) && result['comment'].include?('restarted')
restarted += 1
logger.debug "Service #{resource} was restarted"
elsif result['changes'].present?
changed += 1
logger.debug "Resource #{resource} changed"
elsif result['pchanges'].present?
pending += 1
logger.debug "Resource #{resource} has pending changes"
end
elsif result['result'].nil?
pending += 1
logger.debug "Resource #{resource} is pending"
elsif result['result'] == false
if resource.match(/^service_/) && result['comment'].include?('restarted')
restarted_failed += 1
logger.debug "Service #{resource} failed to restart"
else
failed += 1
logger.debug "Resource #{resource} failed"
end
end
duration = parse_duration(result['duration'])
time[resource] = duration || 0
logger.debug "Duration for resource #{resource}: #{time[resource]} seconds"
end
time[:total] = time.values.compact.sum || 0
logger.debug "Total execution time: #{time[:total]} seconds"
events = { total: changed + failed + restarted + restarted_failed, success: success + restarted, failure: failed + restarted_failed }
changes = { total: changed + restarted }
resources = {
'total' => @raw.size,
'applied' => changed,
'restarted' => restarted,
'failed' => failed,
'failed_restarts' => restarted_failed,
'skipped' => 0,
'scheduled' => 0,
'pending' => pending
}
logger.debug "Metrics calculated: events=#{events}, changes=#{changes}, resources=#{resources}"
{ events: events, resources: resources, changes: changes, time: time }
rescue => e
logger.error "Error calculating metrics: #{e.message}"
raise e
end
def process_normal
logger.debug "Processing normal report"
metrics = calculate_metrics
status = ConfigReportStatusCalculator.new(counters: metrics[:resources].slice(*::ConfigReport::METRIC)).calculate
logger.debug "Calculated status: #{status.inspect}"
@report = ConfigReport.new(host: @host, reported_at: start_time, status: status, metrics: metrics)
if @report.save
logger.debug "Report saved successfully for host #{@host.name}"
import_log_messages
else
logger.error "Failed to save report: #{@report.errors.full_messages.join(', ')}"
end
rescue => e
logger.error "Error processing normal report: #{e.message}"
raise e
end
def process_failures
logger.debug "Processing failure report"
@raw = [@raw] unless @raw.is_a?(Array)
status = ConfigReportStatusCalculator.new(counters: { 'failed' => @raw.size }).calculate
logger.debug "Calculated status for failures: #{status.inspect}"
@report = ConfigReport.create(host: @host, reported_at: Time.zone.now, status: status, metrics: {})
source = Source.find_or_create_by(value: 'Salt')
@raw.each do |failure|
message = Message.find_or_create_by(value: failure)
Log.create(message_id: message.id, source_id: source.id, report: @report, level: :err)
logger.debug "Logged failure message: #{failure}"
end
rescue => e
logger.error "Error processing failures: #{e.message}"
raise e
end
def start_time
@start_time ||= Time.zone.now
end
def determine_log_level(result)
if result['changes'].blank? && result['result'] == true
:info
elsif result['result'] == false
:err
else
:notice
end
end
def extract_message(result)
if result['changes'] && result['changes']['diff']
result['changes']['diff']
elsif result['pchanges'] && result['pchanges']['diff']
result['pchanges']['diff']
elsif result['comment'].present?
result['comment']
else
'No message available'
end
end
def parse_duration(duration)
if duration.is_a?(String)
duration_in_ms = duration.delete(' ms')
Float(duration_in_ms) / 1000
else
duration.to_f / 1000
end
rescue
logger.warn "Unable to parse duration: #{duration}"
nil
end
end
end
Thank you both for your work here. I dont know if you understand how awesome this is. With Broadcom buying out VMware, Vmware is raising its prices significantly for Saltstack UI (aka vRealize Automation Standard Plus). We have some 70,000+ salt minions/hosts we will be using in Foreman, saving us over $7.5M a year!!! So bless you both for all of this work.