diff --git a/.travis.yml b/.travis.yml index 56b7375..97ef193 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ before_install: install: - pip install . script: -- travis_wait 30 nosetests +- travis_wait 40 nosetests deploy: provider: pypi distributions: sdist bdist_wheel diff --git a/pydream/Dream.py b/pydream/Dream.py index 81e34ec..de88d31 100644 --- a/pydream/Dream.py +++ b/pydream/Dream.py @@ -5,8 +5,8 @@ from . import Dream_shared_vars from datetime import datetime import traceback -import multiprocessing as mp -from multiprocessing import pool +import multiprocess as mp +from multiprocess import pool import time class Dream(): @@ -70,7 +70,13 @@ def __init__(self, model, variables=None, nseedchains=None, nCR=3, adapt_crossov self.mp_context = mp_context # Set model and variable attributes (if no variables passed, set to all parameters) self.model = model - self.model_name = model_name + if not model_name: + # Replace `:` for `_` because colons cause problems in windows paths + prefix = datetime.now().strftime('%Y_%m_%d_%H_%M_%S_') + else: + prefix = model_name + '_' + self.model_name = prefix + if variables is None: self.variables = self.model.sampled_parameters else: @@ -937,12 +943,8 @@ def record_history(self, nseedchains, ndimensions, q_new, len_history): Dream_shared_vars.count.value += 1 if self.save_history and len_history == (nhistoryrecs+1)*ndimensions: - if not self.model_name: - prefix = datetime.now().strftime('%Y_%m_%d_%H:%M:%S')+'_' - else: - prefix = self.model_name+'_' - self.save_history_to_disc(np.frombuffer(Dream_shared_vars.history.get_obj()), prefix) + self.save_history_to_disc(np.frombuffer(Dream_shared_vars.history.get_obj()), self.model_name) def save_history_to_disc(self, history, prefix): """Save history and crossover probabilities to files at end of run. @@ -1015,7 +1017,7 @@ def daemon(self, val): pass -from multiprocessing import context +from multiprocess import context # Exists on all platforms diff --git a/pydream/core.py b/pydream/core.py index baf8a37..574b6b0 100644 --- a/pydream/core.py +++ b/pydream/core.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- import numpy as np -import multiprocessing as mp +import collections +import multiprocess as mp from . import Dream_shared_vars from .Dream import Dream, DreamPool from .model import Model @@ -27,6 +28,9 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, Whether run is a continuation of an earlier run. Pass this with the model_name argument to automatically load previous history and crossover probability files. Default: False verbose: Boolean, optional Whether to print verbose output (including acceptance or rejection of moves and the current acceptance rate). Default: True + nverbose: int, optional + Rate at which the acceptance rate is printed if verbose is set to True. Every n-th iteration the acceptance rate + will be printed and added to the acceptance rate file. Default: 10 tempering: Boolean, optional Whether to use parallel tempering for the DREAM chains. Warning: this feature is untested. Use at your own risk! Default: False mp_context: multiprocessing context or None. @@ -53,15 +57,24 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, parameters = [parameters] model = Model(likelihood=likelihood, sampled_parameters=parameters) - + if restart: + model_prefix = kwargs['model_name'] step_instance = Dream(model=model, variables=parameters, - history_file=kwargs['model_name'] + '_DREAM_chain_history.npy', - crossover_file=kwargs['model_name'] + '_DREAM_chain_adapted_crossoverprob.npy', - gamma_file=kwargs['model_name'] + '_DREAM_chain_adapted_gammalevelprob.npy', + history_file=model_prefix + '_DREAM_chain_history.npy', + crossover_file=model_prefix + '_DREAM_chain_adapted_crossoverprob.npy', + gamma_file=model_prefix + '_DREAM_chain_adapted_gammalevelprob.npy', verbose=verbose, mp_context=mp_context, **kwargs) + + # Reload acceptance rate data + chains_naccepts_iterations = [] + for chain in range(nchains): + na = np.load(f'{model_prefix}_naccepts_chain{chain}.npy') + chains_naccepts_iterations.append(na) + else: step_instance = Dream(model=model, variables=parameters, verbose=verbose, mp_context=mp_context, **kwargs) + chains_naccepts_iterations = [np.zeros((2, 1), dtype=np.int)] * nchains pool = _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=start, mp_context=mp_context) try: @@ -71,15 +84,20 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, else: - if type(start) is list: - args = zip([step_instance]*nchains, [niterations]*nchains, start, [verbose]*nchains, [nverbose]*nchains) - - else: - args = list(zip([step_instance]*nchains, [niterations]*nchains, [start]*nchains, [verbose]*nchains, [nverbose]*nchains)) + if not isinstance(start, collections.abc.Iterable): + start = [start] * nchains + args = list(zip([step_instance] * nchains, [niterations] * nchains, start, [verbose] * nchains, + [nverbose]*nchains, list(range(nchains)), chains_naccepts_iterations)) returned_vals = pool.map(_sample_dream, args) sampled_params = [val[0] for val in returned_vals] log_ps = [val[1] for val in returned_vals] + acceptance_rates = [val[2] for val in returned_vals] + + for chain in range(nchains): + filename = f'{step_instance.model_name}acceptance_rates_chain{chain}.txt' + with open(filename, 'ab') as f: + np.savetxt(f, acceptance_rates[chain]) finally: pool.close() pool.join() @@ -88,21 +106,31 @@ def run_dream(parameters, likelihood, nchains=5, niterations=50000, start=None, def _sample_dream(args): - try: + try: dream_instance = args[0] iterations = args[1] start = args[2] verbose = args[3] nverbose = args[4] + chain_idx = args[5] + naccepts_iterations_total = args[6] step_fxn = getattr(dream_instance, 'astep') sampled_params = np.empty((iterations, dream_instance.total_var_dimension)) log_ps = np.empty((iterations, 1)) + acceptance_rates_size = int(np.floor(iterations / nverbose)) + if acceptance_rates_size == 0: + acceptance_rates_size = 1 + acceptance_rates = np.zeros(acceptance_rates_size) q0 = start - naccepts = 0 + iterations_total = np.sum(naccepts_iterations_total[1]) + naccepts = naccepts_iterations_total[0][-1] naccepts100win = 0 - for iteration in range(iterations): + acceptance_counter = 0 + for iteration_idx, iteration in enumerate(range(iterations_total, iterations_total + iterations)): if iteration%nverbose == 0: acceptance_rate = float(naccepts)/(iteration+1) + acceptance_rates[acceptance_counter] = acceptance_rate + acceptance_counter += 1 if verbose: print('Iteration: ',iteration,' acceptance rate: ',acceptance_rate) if iteration%100 == 0: @@ -111,47 +139,50 @@ def _sample_dream(args): print('Iteration: ',iteration,' acceptance rate over last 100 iterations: ',acceptance_rate_100win) naccepts100win = 0 old_params = q0 - sampled_params[iteration], log_prior , log_like = step_fxn(q0) - log_ps[iteration] = log_like + log_prior - q0 = sampled_params[iteration] + sampled_params[iteration_idx], log_prior, log_like = step_fxn(q0) + log_ps[iteration_idx] = log_like + log_prior + q0 = sampled_params[iteration_idx] if old_params is None: old_params = q0 if np.any(q0 != old_params): naccepts += 1 naccepts100win += 1 - + + naccepts_iterations_total = np.append(naccepts_iterations_total, np.array([[naccepts], [iterations]]), axis=1) + np.save(f'{dream_instance.model_name}naccepts_chain{chain_idx}.npy', naccepts_iterations_total) + except Exception as e: traceback.print_exc() print() raise e - return sampled_params, log_ps + return sampled_params, log_ps, acceptance_rates def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose): - + T = np.zeros((nchains)) T[0] = 1. for i in range(nchains): T[i] = np.power(.001, (float(i)/nchains)) - - step_instances = [step_instance]*nchains - + + step_instances = [step_instance]*nchains + if type(start) is list: args = list(zip(step_instances, start, T, [None]*nchains, [None]*nchains)) else: args = list(zip(step_instances, [start]*nchains, T, [None]*nchains, [None]*nchains)) - + sampled_params = np.zeros((nchains, niterations*2, step_instance.total_var_dimension)) log_ps = np.zeros((nchains, niterations*2, 1)) - + q0 = start naccepts = np.zeros((nchains)) naccepts100win = np.zeros((nchains)) nacceptsT = np.zeros((nchains)) nacceptsT100win = np.zeros((nchains)) ttestsper100 = 100./nchains - + for iteration in range(niterations): itidx = iteration*2 if iteration%10 == 0: @@ -176,8 +207,8 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose): logprinews = [val[1] for val in returned_vals] loglikenews = [val[2] for val in returned_vals] dream_instances = [val[3] for val in returned_vals] - logpnews = [T[i]*loglikenews[i] + logprinews[i] for i in range(nchains)] - + logpnews = [T[i]*loglikenews[i] + logprinews[i] for i in range(nchains)] + for chain in range(nchains): sampled_params[chain][itidx] = qnews[chain] log_ps[chain][itidx] = logpnews[chain] @@ -189,17 +220,16 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose): T2 = T[random_chains[1]] logp1 = logpnews[random_chains[0]] logp2 = logpnews[random_chains[1]] - - + alpha = ((T1*loglike2)+(T2*loglike1))-((T1*loglike1)+(T2*loglike2)) - + if np.log(np.random.uniform()) < alpha: if verbose: print('Accepted temperature swap of chains: ',random_chains,' at temperatures: ',T1,' and ',T2,' and logps: ',logp1,' and ',logp2) nacceptsT[random_chains[0]] += 1 nacceptsT[random_chains[1]] += 1 nacceptsT100win[random_chains[0]] += 1 - nacceptsT100win[random_chains[1]] += 1 + nacceptsT100win[random_chains[1]] += 1 old_qs = list(qnews) old_logps = list(logpnews) old_loglikes = list(loglikenews) @@ -215,11 +245,11 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose): else: if verbose: print('Did not accept temperature swap of chains: ',random_chains,' at temperatures: ',T1,' and ',T2,' and logps: ',logp1,' and ',logp2) - + for chain in range(nchains): sampled_params[chain][itidx+1] = qnews[chain] log_ps[chain][itidx+1] = logpnews[chain] - + for i, q in enumerate(qnews): try: if not np.all(q == q0[i]): @@ -228,12 +258,12 @@ def _sample_dream_pt(nchains, niterations, step_instance, start, pool, verbose): except TypeError: #On first iteration without starting points this will fail because q0 == None pass - + args = list(zip(dream_instances, qnews, T, loglikenews, logprinews)) q0 = qnews - + return sampled_params, log_ps - + def _sample_dream_pt_chain(args): @@ -244,11 +274,11 @@ def _sample_dream_pt_chain(args): last_logpri = args[4] step_fxn = getattr(dream_instance, 'astep') q1, logprior1, loglike1 = step_fxn(start, T, last_loglike, last_logpri) - + return q1, logprior1, loglike1, dream_instance def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_context=None): - + min_njobs = (2*len(step_instance.DEpairs))+1 if nchains < min_njobs: raise Exception('Dream should be run with at least (2*DEpairs)+1 number of chains. For current algorithmic settings, set njobs>=%s.' %str(min_njobs)) @@ -266,12 +296,12 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_ arr_dim = ((np.floor(nchains*niterations/step_instance.history_thin)+nchains)*step_instance.total_var_dimension)+(step_instance.nseedchains*step_instance.total_var_dimension) else: arr_dim = np.floor(((nchains*niterations/step_instance.history_thin)*step_instance.total_var_dimension))+(step_instance.nseedchains*step_instance.total_var_dimension) - + min_nseedchains = 2*len(step_instance.DEpairs)*nchains - + if step_instance.nseedchains < min_nseedchains: raise Exception('The size of the seeded starting history is insufficient. Increase nseedchains>=%s.' %str(min_nseedchains)) - + current_position_dim = nchains*step_instance.total_var_dimension # Get context to define arrays if mp_context is None: @@ -295,10 +325,10 @@ def _setup_mp_dream_pool(nchains, niterations, step_instance, start_pt=None, mp_ shared_nchains = ctx.Value('i', nchains) n = ctx.Value('i', 0) tf = ctx.Value('c', b'F') - + if step_instance.crossover_burnin == None: step_instance.crossover_burnin = int(np.floor(niterations/10)) - + if start_pt != None: if step_instance.start_random: print('Warning: start position provided but random_start set to True. Overrode random_start value and starting walk at provided start position.') diff --git a/pydream/tests/test_dream.py b/pydream/tests/test_dream.py index 49532f7..6667588 100644 --- a/pydream/tests/test_dream.py +++ b/pydream/tests/test_dream.py @@ -7,8 +7,9 @@ import unittest from os import remove +from pathlib import Path -import multiprocessing as mp +import multiprocess as mp import numpy as np import pydream.Dream_shared_vars from pydream.Dream import Dream @@ -477,6 +478,12 @@ def test_history_file_loading(self): remove('test_history_loading_DREAM_chain_adapted_crossoverprob.npy') remove('test_history_loading_DREAM_chain_adapted_gammalevelprob.npy') remove('test_history_loading_DREAM_chain_history.npy') + + for p in Path(".").glob("test_history_loading_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("test_history_loading_acceptance_rates*.txt"): + p.unlink() def test_crossover_file_loading(self): """Test that when a crossover file is loaded the crossover values are set to the file values and not adapted.""" @@ -496,6 +503,12 @@ def test_crossover_file_loading(self): remove('testing_crossoverval_load_DREAM.npy') remove('testing_crossover_load_DREAM_chain_history.npy') + for p in Path(".").glob("testing_crossover_load_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("testing_crossover_load_acceptance_rates*.txt"): + p.unlink() + def test_astep_onedmodel(self): """Test that a single step with a one-dimensional model returns values of the expected type and a move that is expected or not given the test logp.""" """Test a single step with a one-dimensional model with a normal parameter.""" @@ -575,8 +588,10 @@ def test_mp_sampledreamfxn(self): start = np.array([-7, 8, 1.2, 0]) verbose = False nverbose = 10 - args = [dream, iterations, start, verbose, nverbose] - sampled_params, logps = _sample_dream(args) + chain_idx = 0 + chains_naccepts_iterations = np.zeros((2, 1), dtype=np.int) + args = [dream, iterations, start, verbose, nverbose, chain_idx, chains_naccepts_iterations] + sampled_params, logps, acceptance_rates = _sample_dream(args) self.assertEqual(len(sampled_params), 10) self.assertEqual(len(sampled_params[0]), 4) @@ -625,6 +640,28 @@ def test_mp_paralleltempering_sampledreamfxn(self): self.assertTrue((logprior_new + loglike_new) >= -400) class Test_Dream_Full_Algorithm(unittest.TestCase): + def test_model_with_restart(self): + """Test that pydream restarts.""" + self.param, self.like = onedmodel() + model_name = 'test_history_correct' + + sampled_params, logps = run_dream(self.param, self.like, niterations=10, nchains=5, save_history=True, + history_thin=1, model_name=model_name, adapt_crossover=False, + verbose=False) + + starts = [sampled_params[chain][-1, :] for chain in range(5)] + sampled_params, logps = run_dream(self.param, self.like, niterations=10, nchains=5, save_history=True, + history_thin=1, model_name=model_name, adapt_crossover=False, + verbose=False, start=starts, restart=True) + remove('test_history_correct_DREAM_chain_history.npy') + remove('test_history_correct_DREAM_chain_adapted_crossoverprob.npy') + remove('test_history_correct_DREAM_chain_adapted_gammalevelprob.npy') + + for p in Path(".").glob("test_history_correct_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("test_history_correct_acceptance_rates*.txt"): + p.unlink() def test_history_correct_after_sampling_simple_model(self): """Test that the history saved matches with the returned sampled parameter values for a one-dimensional test model.""" @@ -700,10 +737,17 @@ def test_boundaries_obeyed_aftersampling(self): self.assertFalse((reshaped_historyupperbound).any()) + # Remove reporting files remove('test_boundaries_DREAM_chain_adapted_crossoverprob.npy') remove('test_boundaries_DREAM_chain_adapted_gammalevelprob.npy') remove('test_boundaries_DREAM_chain_history.npy') + for p in Path(".").glob("test_boundaries_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("test_boundaries_acceptance_rates*.txt"): + p.unlink() + class Test_DREAM_examples(unittest.TestCase): @@ -727,6 +771,12 @@ def test_CORM_example(self): remove('corm_dreamzs_5chain_DREAM_chain_adapted_gammalevelprob.npy') remove('corm_dreamzs_5chain_DREAM_chain_history.npy') + for p in Path(".").glob("corm_dreamzs_5chain_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("corm_dreamzs_5chain_acceptance_rates*.txt"): + p.unlink() + def test_mixturemodel_example(self): """Test that the mixture model example runs and returns values of the expected shape.""" nchains = mix_kwargs['nchains'] @@ -767,6 +817,12 @@ def test_ndimgaussian_example(self): self.assertEqual(len(logps[0][0]), 1) remove('ndim_gaussian_seed.npy') + for p in Path(".").glob("ndim_gaussian_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("ndim_gaussian_acceptance_rates*.txt"): + p.unlink() + def test_robertson_example(self): """Test that the Robertson example runs and returns values of the expected shape.""" nchains = robertson_kwargs['nchains'] @@ -786,6 +842,12 @@ def test_robertson_example(self): self.assertEqual(len(logps[0]), 100) self.assertEqual(len(logps[0][0]), 1) + for p in Path(".").glob("robertson_dreamzs_5chain_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("robertson_dreamzs_5chain_acceptance_rates*.txt"): + p.unlink() + def test_robertson_nopysb_example(self): """Test that the Robertson example without PySB runs and returns values of the expected shape.""" @@ -805,6 +867,12 @@ def test_robertson_nopysb_example(self): self.assertEqual(len(logps[0]), 100) self.assertEqual(len(logps[0][0]), 1) + for p in Path(".").glob("robertson_nopysb_dreamzs_5chain_naccepts*.npy"): + p.unlink() + + for p in Path(".").glob("robertson_nopysb_dreamzs_5chain_acceptance_rates*.txt"): + p.unlink() + if __name__ == '__main__': unittest.main() diff --git a/setup.py b/setup.py index d10e045..23864ff 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def readme(): author='Erin Shockley', author_email='erin.shockley@vanderbilt.edu', packages=['pydream'], - install_requires=['numpy', 'scipy'], + install_requires=['multiprocess', 'numpy', 'scipy'], zip_safe=False, test_suite='nose.collector', tests_require=['nose'],