Source code for amber_meta.amber_run

from __future__ import division, print_function
import os
import argparse
import subprocess
from .amber_options import AmberOptions
from .amber_configuration import AmberConfiguration
from .amber_utils import (
    get_root_name,
    get_full_output_path_and_file,
    get_scenario_file_from_root_yaml_base_dict,
    get_filterbank_header,
    get_nbatch,
    pretty_print_command,
    check_path_ends_with_slash,
    check_directory_exists,
    parse_scenario_to_dictionary,
    create_rfim_configuration_thresholds
)
from .amber_results import (
    read_amber_run_results,
    run_arts_analysis_triggers,
    run_arts_analysis_tools_against_ground_truth
)

AMBER_SETUP_PATH = '/home/vohl/AMBER_setup/'

"""
.. module:: amber_run.rst
   :platform: Unix, Windows
   :synopsis: Module to run amber

.. moduleauthor:: D. Vohl <vohl@astron.nl>


"""


[docs]def create_amber_command(base_name='scenario_3_partitions', input_file='data/filterbank/file.fil', scenario_file='$SOURCE_ROOT/scenario/3_dms_partitions/scenario_3_partitions_step1.sh', config_path='$SOURCE_ROOT/install/scenario_3_partitions_step1/', rfim=True, rfim_mode='time_domain_sigma_cut', rfim_threshold_tdsc=None, rfim_threshold_fdsc=None, snr_mode='snr_mom_sigmacut', input_data_mode='sigproc', cpu_id=1, snrmin=10, output_dir='$OUTPUT_ROOT/results/', verbose=True, root_name=None): """Launch amber. Creates an amber launch command to be run with subprocess. Parameters ---------- base_name : str Base name. input_file : str Intput filterbank file. scenario_file : str Scenario file (including path) config_path : str Path of configuration files rfim : bool Use RFI mitigation or not. rfim_mode : str RFI mitigation mode. Choices: [time_domain_sigma_cut | frequency_domain_sigma_cut] rfim_threshold : str Override rfim threshold value. Default: None snr_mode : str SNR mode. Choices: [snr_standard | snr_momad | snr_mom_sigmacut] input_data_mode : str Input data mode. Choices: [sigproc | data] cpu_id : int CPU id for process and GPU. snrmin : int Minimum SNR for outlier detection. output_dir : str Output directory. verbose : bool Print extra information at runtime. root_name : str Root name used for output. """ if verbose: print("Scenario:", scenario_file) print() # Get scenario scenario_dict = parse_scenario_to_dictionary(scenario_file) # Format input to correspond to behaviour we want config_path = check_path_ends_with_slash(config_path) # COLLECT INFO FROM FILTERBANK header, header_size = get_filterbank_header(input_file, verbose=verbose) nbatch = get_nbatch(input_file, header, header_size, int(scenario_dict['SAMPLES']), verbose=verbose) # Get amber's INSTALL_ROOT variable state conf_dir_base = os.environ['INSTALL_ROOT'] downsampling = (int(scenario_dict['downsampling'.upper()]) != 1) # Pin down amber's to cpu id 'cpu_id' command = ['taskset', '-c', str(cpu_id), 'amber'] amber_options = AmberOptions(rfim=rfim, rfim_mode=rfim_mode, snr_mode=snr_mode, input_data_mode=input_data_mode, downsampling=downsampling) amber_configs = AmberConfiguration(rfim=rfim, rfim_mode=rfim_mode, downsampling=downsampling) # Check that output directory exists. If not, create it. check_directory_exists(output_dir) for option in amber_options.options: # First add the option with a dash (e.g. -opencl_platform) command.append('-' + option) # Then try to fill the input, if applicable if '_file' in option: command.append(config_path + option.split('_file')[0] + '.conf') elif option == 'downsampling': pass # Do not pass any option (TODO: fix naming rule issue with downsampling_configuration) elif option in ['time_domain_sigma_cut_steps', 'time_domain_sigma_cut_configuration']: command.append( "%s%s%s%s" % ( config_path, amber_configs.configurations[rfim_mode][option], "" if rfim_threshold_tdsc in [None, 'None'] else "%s%s" % ('_threshold_', rfim_threshold_tdsc), amber_configs.suffix ) ) elif option in ['frequency_domain_sigma_cut_steps', 'frequency_domain_sigma_cut_configuration']: command.append( "%s%s%s%s" % ( config_path, amber_configs.configurations[rfim_mode][option], "" if rfim_threshold_fdsc in [None, 'None'] else "%s%s" % ('_threshold_', rfim_threshold_fdsc), amber_configs.suffix ) ) elif option in ['downsampling_configuration', 'integration_steps', 'zapped_channels']: command.append( "%s%s%s" % ( config_path, amber_configs.configurations[option], amber_configs.suffix ) ) elif option == 'nr_bins': command.append(scenario_dict['RFIM_FDSC_BINS']) elif option == 'downsampling_factor': command.append(scenario_dict['DOWNSAMPLING']) elif option == 'threshold': command.append(str(snrmin)) elif option == 'data': command.append(input_file) elif option == 'output': if rfim_threshold_tdsc != None and rfim_threshold_fdsc != None: if rfim_mode == "both__tdsc_fdsc": root_name_str = "%s_thresholds_tdsc_%s_fdsc_%s" % (root_name, rfim_threshold_tdsc, rfim_threshold_fdsc) else: root_name_str = "%s_thresholds_fdsc_%s_tdsc_%s" % (root_name, rfim_threshold_fdsc, rfim_threshold_tdsc) elif rfim_threshold_tdsc != None: root_name_str = "%s_threshold_%s" % (root_name, rfim_threshold_tdsc) elif rfim_threshold_fdsc != None: root_name_str = "%s_threshold_%s" % (root_name, rfim_threshold_fdsc) else: root_name_str = root_name command.append( get_full_output_path_and_file( output_dir, base_name, root_name=root_name_str, cpu_id=cpu_id ) ) elif option == 'header': command.append(str(header_size)) elif option == 'batches': command.append(str(nbatch)) else: try: command.append(scenario_dict[option.upper()]) except KeyError: # This option doesn't require any input pass return command
[docs]def run_amber_from_yaml_root(input_yaml_file, root='subband', rfim_threshold_override=False, rfim_threshold_tdsc='3.25', rfim_threshold_fdsc='2.50', verbose=False, print_only=True, detach_completely=True): """Run amber starting from a yaml root scenario file. Launches a amber scenario where each step is run as independent sub-processes. Parameters ---------- input_yaml_file : str Input filename with .yaml or .yml extension. root : str Name of root scenario in input yaml. verbose : bool Print extra information at runtime. print_only : bool Only print command, do not launch them. detach_completely : bool If True, launch all processes and detach from them. Else, wait on last cpu. """ assert input_yaml_file.split('.')[-1] in ['yaml', 'yml'] base = parse_scenario_to_dictionary(input_yaml_file)[root] root_name = get_root_name(input_yaml_file) if verbose: print('ROOT_NAME', root_name) if verbose: print(base) if rfim_threshold_override is None: if 'rfim_threshold' in base: if base['rfim_threshold'] != None: #check_file_exists(base[]) # Should check if the file already exists... create_rfim_configuration_threshold_from_yaml_root(input_yaml_file, root=root, rfim_threshold_tdsc=base['rfim_threshold_tdsc'], rfim_threshold_fdsc=base['rfim_threshold_fdsc'], threshold_=base['rfim_threshold'], verbose=verbose, print_only=print_only) else: create_rfim_configuration_threshold_from_yaml_root(input_yaml_file, root=root, rfim_threshold_tdsc=rfim_threshold_tdsc, rfim_threshold_fdsc=rfim_threshold_fdsc, verbose=verbose, print_only=print_only) for cpu_id in range(base['n_cpu']): command = create_amber_command( base_name=base['base_name'], input_file=base['input_file'], scenario_file=get_scenario_file_from_root_yaml_base_dict(base, cpu_id), config_path='%s%s' % ( check_path_ends_with_slash(base['base_config_path']), check_path_ends_with_slash(base['config_repositories'][cpu_id]), ), rfim=base['rfim'], rfim_mode=base['rfim_mode'], rfim_threshold_tdsc=base['rfim_threshold_tdsc'] if rfim_threshold_tdsc is None else rfim_threshold_tdsc, rfim_threshold_fdsc=base['rfim_threshold_fdsc'] if rfim_threshold_fdsc is None else rfim_threshold_fdsc, snr_mode=base['snr_mode'], input_data_mode=base['input_data_mode'], cpu_id=cpu_id, snrmin=base['snrmin'], output_dir=base['output_dir'], root_name=root_name ) if verbose: pretty_print_command(command) if not print_only: if detach_completely: # Launch amber, and detach from the process so it runs by itself subprocess.Popen(command, preexec_fn=os.setpgrp) else: if cpu_id != base['n_cpu']-1: subprocess.Popen(command, preexec_fn=os.setpgrp) else: subprocess.call(command)
[docs]def run_amber_from_yaml_root_override_threshold(input_basename='yaml/root/root', root='subband', threshold='2.00', verbose=False, print_only=False, detach_completely=True): """Run amber from a yaml root file and override threshold for RFIm input_basename : str Default: 'yaml/root/root' root : str Default: 'subband', threshold : str Default: '2.00' verbose : bool Print extra information at runtime. Default: False. print_only : bool Only print command, do not launch them. Default: False. """ run_amber_from_yaml_root( '%s_%s.yaml' % ( input_basename, threshold ), root=root, verbose=verbose, print_only=print_only, detach_completely=detach_completely )
[docs]def run_amber_from_yaml_root_override_thresholds(input_basename='yaml/root/root', root='subband', thresholds_tdsc = ['3.25'], thresholds_fdsc = ['2.00', '2.25', '2.50', '2.698', '2.75'], verbose=False, print_only=False, detach_completely=False): """Run amber from a yaml root file and for multiple overriden threshold for RFIm input_basename : str Default: 'yaml/root/root' root : str Default: 'subband', thresholds : list Default: ['2.00', '2.50', '3.00', '3.50', '4.00', '4.50', '5.00'] verbose : bool Print extra information at runtime. Default: False. print_only : bool Only print command, do not launch them. Default: False. """ for threshold_tdsc in thresholds_tdsc: for threshold_fdsc in thresholds_fdsc: run_amber_from_yaml_root( '%s.yaml' % ( input_basename ), rfim_threshold_tdsc=threshold_tdsc, rfim_threshold_fdsc=threshold_fdsc, root=root, verbose=verbose, print_only=print_only, detach_completely=detach_completely )
[docs]def create_rfim_configuration_threshold_from_yaml_root(input_yaml_file, root='subband', rfim_threshold_tdsc='3.25', rfim_threshold_fdsc='2.50', verbose=False, print_only=False): """Create RFIm configuration file starting from with a yaml root Parameters ---------- input_yaml_file : str Input root yaml file root : str Root value of the yaml file. Default: 'subband' threshold : list New threshold file to be generated. Default: '2.50', verbose : bool Print extra information at runtime. Default: False. print_only : bool Only print command, do not launch them. Default: False. """ assert input_yaml_file.split('.')[-1] in ['yaml', 'yml'] base = parse_scenario_to_dictionary(input_yaml_file)[root] root_name = get_root_name(input_yaml_file) for cpu_id in range(base['n_cpu']): create_rfim_configuration_thresholds( config_path='%s%s' % ( check_path_ends_with_slash(base['base_config_path']), check_path_ends_with_slash(base['config_repositories'][cpu_id]), ), rfim_mode=base['rfim_mode'], original_threshold_tdsc='2.50', # This is a bit dumb, but will work for now... original_threshold_fdsc='2.50', # This is a bit dumb, but will work for now... new_threshold_tdsc=rfim_threshold_tdsc, new_threshold_fdsc=rfim_threshold_fdsc, duplicate=True, verbose=verbose, print_only=print_only )
[docs]def create_rfim_configuration_thresholds_from_yaml_root(input_yaml_file, root='subband', thresholds = ['2.00', '2.50', '3.00', '3.50', '4.00', '4.50', '5.00'], verbose=False, print_only=False): """Create RFIm configuration files starting from with a yaml root Parameters ---------- input_yaml_file : str Input root yaml file root : str Root value of the yaml file. Default: 'subband' thresholds : list Thresholds files to be generated. Default: ['2.00', '2.50', '3.00', '3.50', '4.00', '4.50', '5.00'], verbose : bool Print extra information at runtime. Default: False. print_only : bool Only print command, do not launch them. Default: False. """ for threshold in thresholds: create_rfim_configuration_threshold_from_yaml_root(input_yaml_file, root=root, threshold=threshold, verbose=verbose, print_only=print_only)
def make_plots_for_rfim_thresholds(input_basename='yaml/root/root', thresholds=['2.00', '2.50', '3.00', '3.50', '4.00', '4.50', '5.00'], triggers=True, tools=True, custom_name_base=None, truth_file=None, truth_name='Truth', detach=True, verbose=True, invert_order=False, print_only=True): for threshold in thresholds: if triggers: run_arts_analysis_triggers( '%s.yaml' % input_basename, threshold=threshold, detach=detach, verbose=verbose, print_only=print_only ) if tools: run_arts_analysis_tools_against_ground_truth( '%s.yaml' % input_basename, threshold=threshold, custom_name_base=custom_name_base, truth_file=truth_file, truth_name=truth_name, detach=detach, verbose=verbose, invert_order=invert_order, print_only=print_only )
[docs]def test_amber_run(input_file='data/dm100.0_nfrb500_1536_sec_20190214-1542.fil', n_cpu=3, base_name='tuning_halfrate_3GPU_goodcentralfreq', base_scenario_path='/home/vohl/software/AMBER/scenario/', scenario_files=['tuning_1.sh', 'tuning_2.sh', 'tuning_3.sh'], snrmin=8, base_config_path='$SOURCE_ROOT/configuration/', config_repositories=[ 'tuning_halfrate_3GPU_goodcentralfreq_step1', 'tuning_halfrate_3GPU_goodcentralfreq_step2', 'tuning_halfrate_3GPU_goodcentralfreq_step3' ], rfim=True, rfim_mode='time_domain_sigma_cut', snr_mode='snr_mom_sigmacut', input_data_mode='sigproc', verbose=True, print_only=False): """Test amber. Creates three amber jobs. Parameters ---------- amber_mode : str input_file : str n_cpu : int base_name : str base_scenario_path : str scenario_files : list snrmin : int base_config_path : str config_repositories : list rfim : bool rfim_mode : str snr_mode : str input_data_mode : str verbose : bool Print extra information at runtime. print_only : bool Only print the command without launching it. """ for cpu_id in range(n_cpu): command = create_amber_command( base_name=base_name, input_file=input_file, scenario_file='%s%s%s' % ( check_path_ends_with_slash(base_scenario_path), check_path_ends_with_slash(base_name), scenario_files[cpu_id], ), config_path='%s%s' % ( check_path_ends_with_slash(base_config_path), check_path_ends_with_slash(config_repositories[cpu_id]), ), rfim=rfim, rfim_mode=rfim_mode, snr_mode=snr_mode, input_data_mode=input_data_mode, cpu_id=cpu_id, snrmin=snrmin ) if verbose: pretty_print_command(command) if not print_only: # Launch amber, and detach from the process so it runs by itself subprocess.Popen(command, preexec_fn=os.setpgrp)
[docs]def get_amber_run_results_from_root_yaml(input_yaml_file, root='subband', verbose=False): """Run amber starting from a yaml root scenario file. Launches a amber scenario where each step is run as independent sub-processes. Parameters ---------- input_yaml_file : str Accepted format are .yaml and .yml root : str Name of root scenario in input yaml. verbose : bool Print extra information at runtime. """ assert input_yaml_file.split('.')[-1] in ['yaml', 'yml'] base = parse_scenario_to_dictionary(input_yaml_file)[root] root_name = get_root_name(input_yaml_file) if verbose: print(base) full_output_dir = check_path_ends_with_slash(base['output_dir']) + root_name return read_amber_run_results(full_output_dir, verbose=verbose)
[docs]def tune_amber(scenario_file='/home/vohl/software/AMBER/scenario/tuning_step1.sh', config_path='/home/vohl/software/AMBER/configuration/tuning_step1', verbose=True, print_only=True): """Tune amber. Tune amber based on a scenario file. The output is save to config_path. Parameters ---------- scenario_file : str config_path : str """ # Format input to correspond to behaviour we want config_path = check_path_ends_with_slash(config_path) # Define command command = [AMBER_SETUP_PATH + 'amber.sh', 'tune', scenario_file, config_path] if verbose: pretty_print_command(command) if not print_only: # Launch amber tuning, and detach from the process so it runs by itself subprocess.Popen(command, preexec_fn=os.setpgrp)
# os.system([for c in command print (c),][0] + '&' )
[docs]def test_tune(base_scenario_path='/home/vohl/software/AMBER/scenario/', base_name='tuning_halfrate_3GPU_goodcentralfreq', scenario_files=['tuning_1.sh', 'tuning_2.sh', 'tuning_3.sh'], config_path='/home/vohl/software/AMBER/configuration/', verbose=True, print_only=True): """Test tuning amber. Launch tune_amber for three scenarios. Parameters base_scenario_path : str base_name : str scenario_files : list config_path : str """ base_scenario_path = check_path_ends_with_slash(base_scenario_path) i = 1 for file in scenario_files: input_file = "%s%s%s%s%s" % ( base_scenario_path, base_name, '/tuning_', i, '.sh' ) output_dir = config_path + base_name + '_step' + str(i) output_dir = check_directory_exists(output_dir) tune_amber(input_file, output_dir, verbose=verbose, print_only=print_only) i += 1
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--verbose', help="Print amber commands", type=bool, default=True) parser.add_argument('--print_only', help="If True, only prints amber commands. If False, also launch the jobs.", type=bool, default=False) args = parser.parse_args() test_amber_run(verbose=args.verbose, print_only=args.print_only)