#!/usr/bin/bash

[[ -f /etc/profile.d/logcenter-path.sh ]] &&
    source /etc/profile.d/logcenter-path.sh

set -f
set -o pipefail
OIFS=$IFS
PROGNAME=${0##*/}
PID=$$

LOCKFILE="/dev/shm/$PROGNAME.lock"
MASTER_ONLY=
THRESHOLD=
RULES=()
DEBUG=
SHOW=
DRY_RUN=1

function exit_usage() {
    local status=${1:-0}
    [[ $status != 0 ]] && exec >&2
    echo "\
Usage: $PROGNAME [OPTION...] -t USED% -r RULE [-r RULE]...
Delete Elasticsearch indices until disk usage goes below threshold

Options:
  -t, --threshold USED%     Disk usage threshold percent used
  -M, --master-only         Run only on master
  -r, --rule RULE           Add rule, see format below
  -sd, --show-disk          Show per-node disk usage, do nothing else
  -si, --show-indices       Show per-indice (DS) stats, do nothing else
  -d, --debug               Enable debug
  -y, --yes-do-it           Dry-run is enabled by default, disable it
  -h, --help                Display this help


The process is as follows:
  - indices are sorted by age, the oldest indice gets looked at first;
  - looping on that sorted list, only the oldest indice of each
    datastream is considered for deletion;
  - rules are evaluated in order;
  - if there is no match, continue and evalutate the rules with the
    oldest indice of the next datastream;
  - if there is a stop match, the datastream is further ignored,
    continuing with the oldest indice of the next datastream;
  - if there is match, indice is deleted, which terminates the process;
  - that process repeats until all data nodes are below disk threshold,
    unless no indice matches for deletion.

Rule format is <ds-pattern>,[<max-ds-size>],[<min-keep>]
Rule examples:
  .monitoring-es-*,,1     For a datastream matching .monitoring-es-*,
                          whatever the total size of that datastream,
                          keep at least 1.
  *,1G,                   For any datastream whose total size is <1GB,
                          do nothing.
  *,2G,5                  For any datastream whose total size is <2GB,
                          keep at least 5.
  *,,3                    For any datastream, keep at least 3.

Notes:
  - only datastream indices are considered;
  - <min-keep> value 0 is an error because an indice cannot be
    deleted if alone in a datastream.

Usage example:
$ $PROGNAME -M -t 92% -r '.monitoring-es-*,,1' -r '*,1G,' \\
    -r '*,2G,5' -r '*,,3'
"
    exit "$status"
}

function _log() {
    if [[ -n $LOG_STDIN ]]; then local i=0; while read -r; do
        LOG_STDIN= "$FUNCNAME" "${*//%l/$((i++))}$REPLY"; done; return 0; fi
    local now=$(date +%Y-%m-%dT%H:%M:%S.%3N%:z) S=${S:-info} L=${L:-INFO}
    logger -t "$PROGNAME[$PID]" -p "$S" -- "${DRY_RUN:+DRY-RUN ** }$*"
    [[ -t 1 ]] && echo "$now $L $PROGNAME: ${DRY_RUN:+DRY-RUN ** }$*"
    return 0
}

function debug() { [[ -z $DEBUG ]] || S=debug L=DEBUG _log "$@"; }
function info() { S=info L=INFO _log "$@"; }
function error() { S=err L=ERROR _log "$@"; }
function fatal() { S=crit L=FATAL _log "$@"; exit 2; }

function release_lock() {
    if [[ $1 == if_ours ]]; then
        [[ -e $LOCKFILE ]] || return
        REPLY=$(readlink "$LOCKFILE") || return
        [[ ${REPLY##*/} == $PID ]] || return
    fi
    rm "$LOCKFILE"
}

function acquire_lock_or_fatal() {
    if ! ln -sT "/proc/$PID" "$LOCKFILE"; then
        [[ -e "$LOCKFILE" ]] || { info 'Removing dead lock'; release_lock; }
        ln -sT "/proc/$PID" "$LOCKFILE" || fatal "Failed to acquire lock $LOCKFILE"
    fi
}

function i_am_master() {
    REPLY=$(es-curl _cat/nodes)
    (( $? == 0 )) || return 1
    REPLY=$(echo "$REPLY" |awk -v "me=$HOSTNAME" '
        $(NF-1) == "*" && $NF == me { print "master" }')
    return 0
}

function get_data_nodes_stats() {
    REPLY=( $(es-curl _nodes/stats/fs |jq -r '.nodes |to_entries |map(
            .value |select(.roles|join(",")|test("data")) |
            . + { free_bytes: .fs.total.available_in_bytes } +
                { used_bytes: (.fs.total.total_in_bytes - .fs.total.available_in_bytes) } |
            . + { used_pct: (.used_bytes*100/.fs.total.total_in_bytes) |round }
        ) |
        sort_by(.name) |
        map("\(.name),\(.used_pct),\(.free_bytes)")[]') )
    (( $? == 0 )) || fatal 'Failed to get data nodes stats'

    if [[ -n $DEBUG || -n $SHOW ]]; then
        { echo 'name,used_pct,free_bytes'; (IFS=$'\n'; echo "${REPLY[*]}"); } |
            if [[ -n $DEBUG ]]; then LOG_STDIN=1 debug 'Node#%l '; else column -t -s ,; fi
    fi
}

function get_indices_stats() {
    REPLY=( $(q_max_date='es-curl "$1/_search" -d "{\"size\":0,
            \"aggs\":{\"ag_max_date\":{\"max\":{\"field\":\"@timestamp\"},
            \"meta\":{\"index\":\"$1\",\"size\":$2,\"creation\":\"$3\"}}}}"'
        es-curl '_cat/indices/.ds-logs-*,.ds-.monitoring-es-*?h=i,ss,cds&bytes=b' |
            xargs -P 20 -L 1 -- bash -c "$q_max_date" -- |
            jq -r '[ .aggregations.ag_max_date.meta.index,
                .aggregations.ag_max_date.value_as_string // .aggregations.ag_max_date.meta.creation,
                .aggregations.ag_max_date.meta.size ] |join(" ")' |
            sed -re 's,^\.ds-([^-]+-[^-]+-[^-]+).*,\1 \0,' |
            sort -k 3Vr |
            {
                # insert per datastream index after datastream name
                # insert total datastream size after index size
                # output csv
                declare -A ds_num=() ds_size=()
                lines=()
                while read -r; do
                    ds=${REPLY%% *}
                    (( ds_num[$ds]++, ds_size[$ds]+=${REPLY##* }  ))
                    lines+=( "$ds ${ds_num[$ds]} ${REPLY#* }" )
                done
                for data in "${lines[@]}"; do
                    echo "${data// /,},${ds_size[${data%% *}]}"
                done
            } |
            tac) )
    (( $? == 0 )) || fatal 'Failed to get data nodes stats'

    if [[ -n $DEBUG || -n $SHOW ]]; then
        { echo 'ds-name,ds-order,index,max-date,size,ds-size'; (IFS=$'\n'; echo "${REPLY[*]}"); } |
            if [[ -n $DEBUG ]]; then LOG_STDIN=1 debug 'Indice#%l '; else column -t -s ,; fi
    fi
}

while (( $# > 0 )); do
    case "$1" in
        -t|--threshold) THRESHOLD=${2%\%}; shift ;;
        -M|--master-only) MASTER_ONLY=1 ;;
        -r|--rule) RULES+=( "$2" ); shift ;;
        -sd|--show-disk) DEBUG= SHOW=1 DRY_RUN= get_data_nodes_stats; exit 0 ;;
        -si|--show-indices) DEBUG= SHOW=1 DRY_RUN= get_indices_stats; exit 0 ;;
        -d|--debug) DEBUG=1 ;;
        -y|--yes-do-it) DRY_RUN= ;;
        -h|--help) exit_usage ;;
        *) exit_usage 1 ;;
    esac
    shift
done

[[ -z $THRESHOLD ]] && exit_usage 1
[[ -n ${THRESHOLD//[0-9]} ]] && exit_usage 1
(( THRESHOLD > 100 || THRESHOLD < 0 )) && exit_usage 1
[[ -z $RULES ]] && exit_usage 1

if [[ -n $MASTER_ONLY ]]; then
    i_am_master || fatal 'Failed to test if master'
    [[ $REPLY == master ]] || { info 'Not on master node, abort'; exit 0; }
fi

relocating_shards=$(es-curl _cluster/health |jq -r '.relocating_shards//""')
[[ $? == 0 && -n $relocating_shards ]] || fatal 'Failed to check if shards are relocating'
[[ $relocating_shards == 0 ]] || { info 'Some shards are relocating, exit'; exit 0; }

trap 'release_lock if_ours' EXIT
acquire_lock_or_fatal

while :; do
    # name, used_pct, free_bytes
    get_data_nodes_stats; nodes=( "${REPLY[@]}" )
    alert_msg=
    for n in "${nodes[@]}"; do
        IFS=,; n=( $n ); IFS=$OIFS
        (( n[1] < THRESHOLD )) && continue
        alert_msg+="${alert_msg:+, }${n[0]} (${n[1]}%)"
    done
    [[ -z $alert_msg ]] && { info 'Disk usage below threshold'; break; }
    info "Disk usage above threshold: $alert_msg"

    has_deleted=
    declare -A ds_to_pass=()
    # ds-name, ds-order (highest is oldest), index, max-date, size, ds-size
    get_indices_stats; indices=( "${REPLY[@]}" )
    for i in "${indices[@]}"; do
        IFS=,; i=( $i ); IFS=$OIFS
        [[ -n ${ds_to_pass["${i[0]}"]} ]] && continue
        debug "Considering indice ${i[2]} (${i[1]}), last ${i[3]}, ds size ${i[5]}"

        for r in "${RULES[@]}"; do
            rule=$r; match=( yes )
            IFS=,; r=( $r ); IFS=$OIFS
            # match ds
            if [[ ${i[0]} != ${r[0]} ]]; then
                match=( no ds-pattern )
            elif [[ -n ${r[1]} ]]; then
                rule_max_ds_bytes=$(numfmt --from iec -- "${r[1]}") ||
                    fatal "Invalid max-ds-size in rule $rule"
                (( i[5] >= rule_max_ds_bytes )) && match=( no max-ds-size )
            fi
            if [[ $match == yes ]]; then
                if [[ -z ${r[2]} ]]; then
                    match=( yes stop )
                elif [[ -n ${r[2]//[0-9]} || ${r[2]} == 0 ]]; then
                    fatal "Invalid min-keep in rule $rule"
                elif (( i[1] <= r[2] )); then
                    match=( no min-keep )
                fi
            fi

            debug "Rule $rule => ${match[0]}${match[1]:+, ${match[1]}}"
            ds_to_pass["${i[0]}"]=1

            if [[ ${match[0]} == yes ]]; then
                if [[ -z ${match[1]} ]]; then
                    info "Delete indice ${i[2]}"
                    if [[ -z $DRY_RUN ]]; then
                        out=$(es-curl "${i[2]}" -XDELETE) ||
                            fatal "Delete failed${out:+: $out}"
                        has_deleted=1
                    fi
                    break 2
                else # it is a stop action
                    break
                fi
            fi
        done
    done

    [[ -z $has_deleted ]] && { info 'Not match, nothing deleted'; break; }
    [[ -n $DRY_RUN ]] && break
    sleep 1
done
