#!/usr/bin/bash
#
# Rolling restart of the local Elasticsearch cluster.
# Relies on local helpers: es-curl, es-maintenance (must be in PATH on launcher
# and on every cluster node).

set -f
PROGNAME=${0##*/}

# config
WAIT_CONFIRM__ES_RESTART=${WAIT_CONFIRM__ES_RESTART:-10}
WAIT_CONFIRM__REBOOT=${WAIT_CONFIRM__REBOOT:-10}
WAIT_CONFIRM__CLUSTER_GREEN=${WAIT_CONFIRM__CLUSTER_GREEN:-10}
DISK_CLOSE_MARGIN_PCT=${DISK_CLOSE_MARGIN_PCT:-2}

# options
UPDATE=
REPO_ELASTIC=elasticsearch
REBOOT=
DRY_RUN=1
BATCH=
DISK_CHECK=1
declare -A ONLY_NODES=()
declare -A SKIP_NODES=()
BASH_CMD=
NODE_BASH_CMD='ssh -T %h bash'
NODE_BASH_INIT=

# computed
LOCAL_NODE=
declare -A NODES=()
declare -A NODES_ROLES=()
MASTER_NODE=
LOW_WATERMARK=()
FILTERED_NODES=()
ORDERED_NODES=()
declare -A NODES_DISK_PERCENT=()
declare -A NODES_DISK_USED=()
declare -A NODES_DISK_AVAIL=()
declare -A NODES_DISK_TOTAL=()

function info() { echo "$(date +%Y-%m-%dT%H:%M:%S.%3N%:z) INFO: $PROGNAME: $*" >&2; }
function warn() { echo "$(date +%Y-%m-%dT%H:%M:%S.%3N%:z) WARNING: $PROGNAME: $*" >&2; }
function error() { echo "$(date +%Y-%m-%dT%H:%M:%S.%3N%:z) ERROR: $PROGNAME: $*" >&2; }
function fatal() { echo "$(date +%Y-%m-%dT%H:%M:%S.%3N%:z) FATAL: $PROGNAME: $*" >&2; exit 2; }

function exit_usage() {
    local status=${1:-0}
    [[ $status == 0 ]] || exec >&2
    cat <<EOF
Usage: $PROGNAME [OPTIONS]
Rolling restart of Elasticsearch cluster nodes.

Options:
    --update                Run dnf update of elasticsearch before restart
    --repo-elastic          YUM repository used to update elasticsearch
    --reboot                Reboot the node instead of just restarting the
                            elasticsearch service. With --update, the reboot
                            happens after the dnf update. The launcher node
                            is never rebooted (warning emitted)
    --no-dry-run            Dry-run is enabled by default, disable it
    --batch                 Non-interactive mode: fatal on node failure instead
                            of prompting to continue (default if stdin is not
                            a terminal)
    --only NODE[,NODE...]   Only process these nodes (repeatable)
    --skip NODE[,NODE...]   Skip these nodes (repeatable)
    --no-disk-space-check   Skip the low watermark preflight (above + close)
    --bash-cmd CMD          Custom bash command to run this script instead of
                            local shebang, eg: ssh root@es1 bash
    --node-bash-cmd CMD     Custom bash command template used by this script
                            to execute code on cluster nodes; %h is replaced
                            by the node name, %i by the IP.
                            Default: "$NODE_BASH_CMD"
    --node-bash-init CODE   Remote shell initialization code
    -h, --help              Display this help
EOF
    exit "$status"
}

# wait-until [-m max-try[:confirm[:reset]]] command...
# Note: sleep 1 second between each try
function wait-until() {
    local explain="$FUNCNAME ${*@Q}" try=1 max=30
    local step=0 last confirm_try= confirm_max= reset_try= reset_max=2
    [[ $1 == -m ]] && { max=$2; shift 2; }
    [[ $max == *:* ]] && { confirm_max=${max#*:}; max=${max%%:*}; }
    [[ $confirm_max == *:* ]] && { reset_max=${confirm_max#*:}; confirm_max=${confirm_max%%:*}; }
    while :; do
        echo "[t:$try/${max:--}${confirm_try:+ c:$confirm_try/${confirm_max:--}}${reset_try:+ r:$reset_try/${reset_max:--}}] $explain" >&2
        last=0; "$@" || last=$?
        if (( step == 0 )); then
            case "$last,$confirm_max" in
                0,) return 0 ;;
                0,*) (( step++, confirm_try++ )) ||: ;;
                *) (( try++ >= max && max )) && return 1 ;;
            esac
        elif (( last != 0 )); then
            (( reset_max && reset_try >= reset_max )) && return 1
            (( confirm_try > 1 )) && { (( reset_try++ )) ||:; confirm_try=; try=1; step=; }
        elif (( confirm_try++ >= confirm_max && confirm_max )); then
            return 0
        fi
        sleep 1
    done
}

# remote_exec NAME IP COMMAND_STRING
function remote_exec() {
    local name=$1 ip=$2; shift 2
    declare -a "bash_cmd=( $NODE_BASH_CMD )"
    bash_cmd=( "${bash_cmd[@]//%h/$name}" )
    bash_cmd=( "${bash_cmd[@]//%i/$ip}" )
    if [[ ${bash_cmd[*]} != ${bash_cmd[*]//%[hi]} ]]; then
        error "$FUNCNAME: Unresolved %h or %i in NODE_BASH_CMD"
        return 99
    fi
    echo "${NODE_BASH_INIT:-:}; $*" |"${bash_cmd[@]}"
}

function get_cluster_topology_or_fatal() {
    LOCAL_NODE=$(es-curl '_nodes/_local/_none?filter_path=nodes.*.name' |jq -r '.nodes[].name')
    [[ -n $LOCAL_NODE ]] || fatal 'Failed to get local node name'

    NODES=()
    NODES_ROLES=()
    MASTER_NODE=
    local name ip roles master
    while read -r name ip roles master; do
        NODES[$name]=$ip
        NODES_ROLES[$name]=$roles
        [[ $master == '*' ]] && MASTER_NODE=$name
    done < <(es-curl '_cat/nodes?h=name,ip,node.role,master')
    # assume _cat/nodes succeeded if the master was found
    [[ -n $MASTER_NODE ]] || fatal 'Failed to get cluster nodes'
}

function filter_nodes() {
    local name
    FILTERED_NODES=()
    for name in "${!NODES[@]}"; do
        (( ${#ONLY_NODES[@]} > 0 )) && [[ -z ${ONLY_NODES[$name]} ]] && continue
        [[ -n ${SKIP_NODES[$name]} ]] && continue
        FILTERED_NODES+=( "$name" )
    done
}

function plan_order() {
    ORDERED_NODES=( $(for name in "$@"; do
        bucket=2; tier=4
        if [[ $name == "$MASTER_NODE" ]]; then
            # current master always last
            bucket=5
        elif [[ ${NODES_ROLES[$name]} == *[dhwcfs]* ]]; then
            # data nodes first, frozen to hot if multi-tier
            bucket=1
            case "${NODES_ROLES[$name]}" in
                *f*) tier=0 ;;
                *c*) tier=1 ;;
                *w*) tier=2 ;;
                *h*) tier=3 ;;
            esac
        elif [[ ${NODES_ROLES[$name]} == *m* ]]; then
            # voting-only, then master-eligibles
            if [[ ${NODES_ROLES[$name]} == *v* ]]; then
                bucket=3
            else
                bucket=4
            fi
        fi
        printf '%d\t%d\t%s\n' "$bucket" "$tier" "$name"
    done |sort -t $'\t' -k1,1n -k2,2n -k3,3 |cut -d $'\t' -f 3) )
}

function cluster_is_green() {
    local status
    status=$(es-curl _cat/health -m 3 2>/dev/null |awk 'NR==1 {print $4}')
    [[ $status == green ]] && return 0
    return 1
}

function elasticsearch_has_restarted_and_is_ok() {
    local elasticsearch_restart
    systemctl is-active --quiet elasticsearch || return 1
    elasticsearch_restart=$(LC_ALL=C systemctl show elasticsearch -p ActiveEnterTimestamp --value) || return 1
    elasticsearch_restart=$(date -d "$elasticsearch_restart" +%s) || return 1
    (( elasticsearch_restart > previous_elasticsearch_restart )) || return 1
    es-curl _cat/health -m 3 >/dev/null || return 2
    return 0
}

function reboot_done_and_elasticsearch_is_ok() {
    local reboot_time
    reboot_time=$(awk '/btime/ {print $2}' /proc/stat)
    (( reboot_time > previous_reboot_time )) || return 1
    es-curl _cat/health -m 3 >/dev/null || return 2
    return 0
}

function preflight_remote_exec_or_fatal() {
    local name=$1 explain
    info "$name: Preflight remote exec check"
    remote_exec "$name" "${NODES[$name]}" '
        [[ $UID == 0 ]] || exit 8
        es-curl > /dev/null || { ret=$?; (( ret != 0 && ret != 127 )) && ret=9; exit "$ret"; }
    '
    case "$?" in
        0) return 0 ;;
        8) explain='root required' ;;
        9) explain='es-curl failed' ;;
        127) explain='es-curl not found' ;;
        *) explain='access error' ;;
    esac

    fatal "$name: Remote exec failed, $explain"
}

# get_low_watermark_or_fatal:
# Reads and parse cluster.routing.allocation.disk.watermark.low into
# LOW_WATERMARK=( <type> <value> ) where <type> is "p" (percentage)
# or "b" (integer bytes). Ratio format (0.xx) is normalized to percentage.
function get_low_watermark_or_fatal() {
    LOW_WATERMARK=()
    local raw
    raw=$(es-curl '_cluster/settings?include_defaults=true&flat_settings=true' |jq -r '
        .transient["cluster.routing.allocation.disk.watermark.low"] //
        .persistent["cluster.routing.allocation.disk.watermark.low"] //
        .defaults["cluster.routing.allocation.disk.watermark.low"] //
        empty
    ')
    [[ -z $raw ]] && fatal 'Failed to get cluster disk watermark low'

    if [[ $raw == *% ]]; then
        LOW_WATERMARK=( p "${raw%\%}" )
    elif [[ ${raw//[0-9]} == . ]]; then
        LOW_WATERMARK=( p $(awk -v "raw=$raw" 'BEGIN { printf "%.6lf", raw*100 }') )
    elif [[ $raw == *b ]]; then
        local val=${raw%%[^0-9.]*} mult
        case "$raw" in
            *pb) mult=1125899906842624 ;;
            *tb) mult=1099511627776 ;;
            *gb) mult=1073741824 ;;
            *mb) mult=1048576 ;;
            *kb) mult=1024 ;;
            *b) mult=1 ;;
        esac
        LOW_WATERMARK=( b $(awk -v "val=$val" -v "mult=$mult" 'BEGIN { printf "%lu", val*mult }') )
    fi
    [[ -z $LOW_WATERMARK ]] && fatal 'Failed to parse cluster disk watermark low'
    return 0
}

function get_nodes_allocation_or_fatal() {
    NODES_DISK_PERCENT=()
    NODES_DISK_USED=()
    NODES_DISK_AVAIL=()
    NODES_DISK_TOTAL=()
    local node dp du da dt
    while read -r node dp du da dt; do
        NODES_DISK_PERCENT[$node]=$dp
        NODES_DISK_USED[$node]=$du
        NODES_DISK_AVAIL[$node]=$da
        NODES_DISK_TOTAL[$node]=$dt
    done < <(es-curl '_cat/allocation?bytes=b&h=node,disk.percent,disk.used,disk.avail,disk.total')
    (( ${#NODES_DISK_PERCENT[@]} > 0 )) || fatal 'Failed to get nodes disk allocation'
    return 0
}

function preflight_disk_space_or_fatal() {
    local node=$1
    # return success if no disk stats, assuming it is not a data node
    [[ -z ${NODES_DISK_PERCENT[$node]} ]] && return 0
    info "$name: Preflight disk space check"

    if [[ ${LOW_WATERMARK[0]} == p ]]; then
        awk -v "dp=${NODES_DISK_PERCENT[$node]}" \
            -v "lw_p=${LOW_WATERMARK[1]}" \
            -v "m=$DISK_CLOSE_MARGIN_PCT" \
            'BEGIN { exit (dp >= lw_p - m) ? 1 : 0 }'
    else # ${LOW_WATERMARK[0]} == b
        awk -v "da=${NODES_DISK_AVAIL[$node]}" \
            -v "dt=${NODES_DISK_TOTAL[$node]}" \
            -v "lw_b=${LOW_WATERMARK[1]}" \
            -v "m=$DISK_CLOSE_MARGIN_PCT" \
            'BEGIN { exit (da <= lw_b + m*dt/100) ? 1 : 0 }'
    fi
    (( $? == 0 )) || fatal "$node: Disk usage too high"
    return 0
}

function do_remote_es_restart() {
    local name=$1
    local previous_elasticsearch_restart

    info "$name: Read elasticsearch start time"
    previous_elasticsearch_restart=$(remote_exec "$name" "${NODES[$name]}" '
        systemctl is-active --quiet elasticsearch || { echo 0; exit 0; }
        d=$(LC_ALL=C systemctl show elasticsearch -p ActiveEnterTimestamp --value) || exit 1;
        date -d "$d" +%s')
    if [[ $? != 0 || -z $previous_elasticsearch_restart ]]; then
        error "$name: Failed to read elasticsearch start time"
        return 1
    fi

    info "$name: Restart elasticsearch"
    if ! remote_exec "$name" "${NODES[$name]}" 'systemctl restart elasticsearch'; then
        error "$name: systemctl restart elasticsearch failed"
        return 1
    fi

    info "$name: Wait until elasticsearch has restarted"
    wait-until -m ":${WAIT_CONFIRM__ES_RESTART}:" remote_exec "$name" "${NODES[$name]}" "
        $(declare -f elasticsearch_has_restarted_and_is_ok)
        $(declare -p previous_elasticsearch_restart)
        elasticsearch_has_restarted_and_is_ok
    "
}

function do_remote_reboot() {
    local name=$1
    local previous_reboot_time

    info "$name: Read boot time"
    previous_reboot_time=$(remote_exec "$name" "${NODES[$name]}" "awk '/btime/ {print \$2}' /proc/stat")
    if [[ $? != 0 || -z $previous_reboot_time ]]; then
        error "$name: Failed to read boot time"
        return 1
    fi

    info "$name: Reboot node"
    # systemctl reboot tears down the SSH session, exit code is unreliable;
    # rely on wait-until below to confirm the reboot really happened.
    remote_exec "$name" "${NODES[$name]}" 'systemctl reboot'

    info "$name: Wait until elasticsearch has restarted after reboot"
    wait-until -m ":${WAIT_CONFIRM__REBOOT}:" remote_exec "$name" "${NODES[$name]}" "
        $(declare -f reboot_done_and_elasticsearch_is_ok)
        $(declare -p previous_reboot_time)
        reboot_done_and_elasticsearch_is_ok
    "
}

function process_node() {
    local name=$1

    info "$name: Run es-maintenance begin-node"
    wait-until -m : es-maintenance begin-node

    if [[ -n $UPDATE ]]; then
        info "$name: Update elasticsearch"
        remote_exec "$name" "${NODES[$name]}" '
            # fake term to follow yum progress
            function faketerm() { local cmd=( dnf  "$@" ); script -efqc "${*@Q}"; } 
            faketerm dnf -y --setopt install_weak_deps=0 --disablerepo=\* \
                --enablerepo='"$REPO_ELASTIC"' update elasticsearch
        '
        (( $? == 0 )) || { error "$name: Failed to update elasticsearch"; return 1; }
    fi

    if [[ -n $REBOOT ]]; then
        if [[ $name == "$LOCAL_NODE" ]]; then
            warn "$name: This is the launcher node, reboot will not be performed"
            warn "$name: Elasticsearch will be restarted instead"
            do_remote_es_restart "$name" ||
                { error "$name: Restart operation failed"; return 1; }
        else
            do_remote_reboot "$name" ||
                { error "$name: Reboot operation failed"; return 1; }
        fi
    else
        do_remote_es_restart "$name" ||
            { error "$name: Restart operation failed"; return 1; }
    fi

    info "$name: Run es-maintenance end-node"
    wait-until -m : es-maintenance end-node

    info "$name: Wait until cluster is green"
    wait-until -m ":${WAIT_CONFIRM__CLUSTER_GREEN}:" cluster_is_green
}

# main

argv_orig=( "$@" )

while (( $# > 0 )); do
    case "$1" in
        --update) UPDATE=1 ;;
        --repo-elastic) REPO_ELASTIC=$2; shift ;;
        --reboot) REBOOT=1 ;;
        --no-dry-run) DRY_RUN= ;;
        --batch) BATCH=1 ;;
        --only) for i in ${2//,/$IFS}; do ONLY_NODES[$i]=$i; done; shift ;;
        --skip) for i in ${2//,/$IFS}; do SKIP_NODES[$i]=$i; done; shift ;;
        --no-disk-space-check) DISK_CHECK= ;;
        --bash-cmd) BASH_CMD=$2; shift ;;
        --node-bash-cmd) NODE_BASH_CMD=$2; shift ;;
        --node-bash-init) NODE_BASH_INIT=$2; shift ;;
        -h|--help) exit_usage ;;
        --) shift; break ;;
        -*) error "Unknown option: $1"; exit_usage 1 ;;
        *) error "Unexpected positional argument: $1"; exit_usage 1 ;;
    esac
    shift
done

if [[ -n $BASH_CMD ]]; then
    declare -a "bash_cmd=( $BASH_CMD )"
    exec "${bash_cmd[@]}" < <(
        echo "BASH_ARGV0=$PROGNAME"
        echo "set -- ${argv_orig[*]@Q} --bash-cmd ''"
        cat "${BASH_SOURCE[0]}"
    )
fi

get_cluster_topology_or_fatal
info "Local node: $LOCAL_NODE"

filter_nodes
[[ -n $FILTERED_NODES ]] || fatal "No nodes match after filtering"

plan_order "${FILTERED_NODES[@]}"
[[ -n $ORDERED_NODES ]] || fatal "Empty plan after ordering"

info "Nodes will be processed in this order: ${ORDERED_NODES[@]}"

for name in "${ORDERED_NODES[@]}"; do
    preflight_remote_exec_or_fatal "$name"
done

if [[ -n $DISK_CHECK ]]; then
    get_low_watermark_or_fatal
    get_nodes_allocation_or_fatal
    for name in "${ORDERED_NODES[@]}"; do
        preflight_disk_space_or_fatal "$name"
    done
else
    warn 'Disk space check skipped (--no-disk-space-check)'
fi

if ! cluster_is_green; then
    fatal "Cluster is not green, abort"
fi

if [[ -n $DRY_RUN ]]; then
    info 'Dry-run mode, bye'
    exit 0
fi

info 'Run es-maintenance begin-rolling'
wait-until -m : es-maintenance begin-rolling

for name in "${ORDERED_NODES[@]}"; do
    process_node "$name" && continue
    [[ -n $BATCH || ! -t 0 ]] && fatal "$name: process_node failed"

    error "$name: Failed to process node"
    error "$name: Fix it then continue to next node, or abort the script"
    while :; do
        read -r -p "Continue to next node [y/N]: "
        [[ ${REPLY,,} == n ]] && fatal 'Abort at user request, bye'
        [[ ${REPLY,,} == y ]] && continue 2
    done
done

info 'Run es-maintenance end-rolling'
wait-until -m : es-maintenance end-rolling

info "Done"
