Source code for aiida_yambo.workflows.utils.helpers_aiida_qe

# -*- coding: utf-8 -*-
"""Classes for calcs e wfls analysis. hybrid AiiDA and not_AiiDA...hopefully"""
from __future__ import absolute_import
import numpy as np
from scipy.optimize import curve_fit
from matplotlib import pyplot as plt, style
import pandas as pd
import copy

try:
    from aiida.orm import Dict, Str, load_node, KpointsData
    from aiida.plugins import CalculationFactory, DataFactory
except:
    pass

################################################################################
################################################################################

[docs]class calc_manager_aiida_qe: #the interface class to AiiDA... could be separated fro aiida and yambopy def __init__(self, calc_info={}): for key in calc_info.keys(): setattr(self, str(key), calc_info[key]) ################################## update_parameters #####################################
[docs] def updater(self, inp_to_update, k_distance, first): #parameter list? yambopy philosophy if self.var == 'bands': new_params = inp_to_update.yres.gw.parameters.get_dict() new_params['BndsRnXp'][-1] = new_params['BndsRnXp'][-1] + self.delta*first new_params['GbndRnge'][-1] = new_params['GbndRnge'][-1] + self.delta*first inp_to_update.yres.gw.parameters = Dict(new_params) value = new_params['GbndRnge'][-1] elif self.var == 'kpoints': k_distance = k_distance + self.delta*first inp_to_update.scf.kpoints = KpointsData() inp_to_update.scf.kpoints.set_cell(inp_to_update.scf.pw.structure.cell) inp_to_update.scf.kpoints.set_kpoints_mesh_from_density(1/k_distance, force_parity=True) inp_to_update.nscf.kpoints = inp_to_update.scf.kpoints try: del inp_to_update.parent_folder #I need to start from scratch... except: pass value = k_distance elif self.var == 'cutoff': new_params = inp_to_update.yres.gw.parameters.get_dict() new_params['CUTBox'] = new_params['CUTBox'] + [1,1,1]*self.delta*first inp_to_update.yres.gw.parameters = Dict(new_params) value = new_params['CUTBox'][-1] else: #"scalar" quantity new_params = inp_to_update.yres.gw.parameters.get_dict() new_params[str(self.var)] = new_params[str(self.var)] + self.delta*first inp_to_update.yres.gw.parameters = Dict(new_params) value = new_params[str(self.var)] return inp_to_update, value
################################## parsers #####################################
[docs] def take_quantities(self, start = 1): #yambopy philosophy? backtrace = self.steps #*self.iter what = self.what print('looking for {} in k-points {}'.format(what,where)) if what == 'etot': etot = np.zeros((calcs_info['steps']*calcs_info['iter'],3)) for i in range(1,calcs_info['steps']*calcs_info['iter']+1): pw_calc = load_node(calcs_info['wfl_pk']).caller.called[calcs_info['steps']*calcs_info['iter']-i].called[0] etot[i-1,1] = pw_calc.outputs.output_parameters.get_dict()['energy'] etot[i-1,0] = i*calcs_info['delta'] #number of the iteration times the delta... to be used in a fit etot[i-1,2] = int(pw_calc.pk) #calc responsible of the calculation return etot #delta etot better? elif what == 'structure': etot = np.zeros((calcs_info['steps']*calcs_info['iter'],3)) cells = np.zeros((calcs_info['steps']*calcs_info['iter'],3,3)) nr_atoms = load_node(calcs_info['wfl_pk']).caller.called[0].called[0].outputs.output_parameters.get_dict()['number_of_atoms'] atoms = np.zeros((calcs_info['steps']*calcs_info['iter'],nr_atoms,3)) for i in range(1,calcs_info['steps']*calcs_info['iter']+1): pw_calc = load_node(calcs_info['wfl_pk']).caller.called[calcs_info['steps']*calcs_info['iter']-i].called[0] etot[i-1,1] = pw_calc.outputs.output_parameters.get_dict()['energy'] etot[i-1,0] = i*calcs_info['delta'] #number of the iteration times the delta... to be used in a fit etot[i-1,2] = int(pw_calc.pk) #calc responsible of the calculation cells[i-1] = np.matrix(pw_calc.outputs.output_structure.cell) for j in range(nr_atoms): atoms[i-1][j] = np.array(pw_calc.outputs.output_structure.sites[j].position) return [etot, cells, atoms] #delta etot better?
######################### AiiDA specific #############################
[docs] def get_caller(self, calc, depth = 2): for i in range(depth): calc = load_node(calc).caller return calc
[docs] def get_called(self, calc, depth = 2): for i in range(depth): calc = load_node(calc).called[0] return calc
[docs] def start_from_converged(self, node, params): self.ctx.calc_inputs.pw.parameters = node.get_builder_restart().pw['parameters']
[docs] def set_relaxed_structure(self, last_ok): self.ctx.calc_inputs.pw.structure = last_ok.outputs.output_structure
[docs] def set_parent(self, last_ok): self.ctx.calc_inputs.pw.parent_folder = last_ok.called[0].outputs.remote_folder
[docs] def take_down(self, node = 0, what = 'CalcJobNode'): global calc_node if node == 0: node = load_node(self.wfl_pk) else: node = load_node(node) if what not in str(node.get_description): self.take_down(node.called[0]) else: calc_node = node return calc_node
[docs] def take_super(self, node = 0, what = 'WorkChainNode'): global workchain_node if node == 0: node = load_node(self.wfl_pk) else: node = load_node(node) if what not in str(node.get_description): self.take_super(node.caller) else: workchain_node = node return workchain_node