#!/usr/bin/bash

# Typical usage:
# cat objects.ndjson |es-inject-json - my-index
# cat objects.ndjson |es-inject-json - logs-my-datastream create

export LC_ALL=C
PROGNAME=${0##*/}
TEMP_FILE="${TMPDIR:-/dev/shm}/${PROGNAME}.$(date +%s).${RANDOM}"

function exit_usage() {
    local status=${1:-1}
    [[ $status != 0 ]] && exec >&2
    echo "\
Usage: $PROGNAME [OPTION...]    JSON-FILE|- INDEX
       $PROGNAME [OPTION...] -d JSON-FILE|- DATASTREAM
       $PROGNAME [OPTION...]    JSON-FILE|- DATASTREAM create
Inject JSON data to Elasticsearch

Options:
  -r, --refresh            Post /_refresh on the index before exit
  -d, --datastream NAME    Set op_type create instead of index
  -e, --es-errors FILE     Write elasticsearch rejects to file
  -rt, --read-timeout SEC  Break read wait to send batch, for continuous input
  -mb, --max-bytes BYTES   Max POST _bulk size, default 10MB
  -h, --help               Display this help

Usage example:
$ xzcat lotus-202206.json.xz |$PROGNAME - json-lotus
"
    exit "$status"
}

REFRESH=
DATA=
INDEX=
ACTION=
ES_ERRORS=/dev/stdout
READ_TIMEOUT=
MAX_BYTES=$(( 10*1024*1024 ))

while (( $# > 0 )); do
    case "$1" in
        -r|--refesh) REFRESH=1 ;;
        -e|--es-errors) ES_ERRORS=$2; shift ;;
        -d|--datastream) ACTION=create ;;
        -rt|--read-timeout) READ_TIMEOUT=$2; shift ;;
        -mb|--max-bytes) MAX_BYTES=$2; shift ;;
        -h|--help) exit_usage 0 ;;
        -?*) exit_usage 1 ;;
        *) break ;;
    esac
    shift
done

DATA=${1:-/dev/stdin}
INDEX=$2
ACTION=${ACTION:-${3:-index}}
[[ $DATA == - ]] && DATA=/dev/stdin
[[ -z $INDEX ]] && exit_usage 1
[[ -n "$READ_TIMEOUT" && -n ${READ_TIMEOUT//[0-9]} ]] && exit_usage 1
[[ -z $MAX_BYTES || -n ${MAX_BYTES//[0-9]} ]] && exit_usage 1

has_read=
has_error=

trap 'rm -f "$TEMP_FILE"' EXIT

while read -r; do
    has_read=1
    {
        echo "{\"$ACTION\": {}}"
        echo "$REPLY"
        bytes=$(( ${#REPLY} + 1 + 9 + ${#ACTION} ))
        while read -r ${READ_TIMEOUT:+-t "$READ_TIMEOUT"}; do
            echo "{\"$ACTION\": {}}"
            echo "$REPLY"
            (( bytes += (${#REPLY} + 1 + 9 + ${#ACTION}) ))
            (( bytes > MAX_BYTES )) && break
        done
    } > "$TEMP_FILE"

    es-curl "/$INDEX/_bulk" \
        -X POST -H 'Content-Type: application/x-ndjson' \
        --data-binary "@$TEMP_FILE" |
    jq -c '
if .errors then .items |to_entries[] |
select(.value |to_entries[] |select(.value.error))
else halt end' |
    gawk -b '
ARGIND==1 && match($0, /^{"key":([0-9]+),"value":(.*)}$/, cap){error[cap[1]]=cap[2]}
ARGIND==2 && FNR%2==0 {i=int(FNR/2)-1; if (i in error) data[i]=$0}
END{ for (i in error) printf("{\"data\":%s,\"result\":%s}\n", data[i], error[i]);
     if (length(error) > 0) exit(1); }' - "$TEMP_FILE" >> "$ES_ERRORS"

    ps=${PIPESTATUS[*]}
    [[ -z ${ps//[0 ]} ]] || has_error=1
    [[ -f "$ES_ERRORS" && ! -s "$ES_ERRORS" ]] && rm "$ES_ERRORS"
done < <(jq -Mc . < "$DATA")

[[ -z $has_read ]] && exit 2
[[ -n $has_error ]] && exit 3

if [[ -n $REFRESH ]]; then
    out=$(es-curl "/$INDEX/_refresh" -X POST) ||
        echo "$out" >&2
fi
