#!/usr/bin/bash

export LC_ALL=C
PROGNAME=${0##*/}

[[ -f /etc/profile.d/logcenter-path.sh ]] &&
    source /etc/profile.d/logcenter-path.sh
[[ -f /etc/profile.d/logcenter-es.sh ]] &&
    source /etc/profile.d/logcenter-es.sh

DIR= # required
PATTERN='*.log'
TEMP_MAX_SIZE=$((100*1024*1024))
MODE=raw
DATASTREAM=logs-zlogtail-zlc
UPDATE=1
HEAD_SIG_SIZE=512
CHECK_NULL_TAIL=1
CHECK_OPEN_WRITE=1
UUID= # default computed
CACHE_FILE= # computed from UUID
TEMP_FILE= # computed from UUID
ES_ERRORS= # computed from UUID

function info() { echo "INFO: $PROGNAME: $*" >&2; }
function error() { echo "ERROR: $PROGNAME: $*" >&2; }
function fatal() { echo "FATAL: $PROGNAME: $*" >&2; exit 2; }

function exit_usage() {
    local status=${1:-0}
    [[ $status != 0 ]] && exec >&2
    echo "\
Usage: $PROGNAME -d DIR [OPTION...]
Read and follow log files, send to elasticsearch

Options:
  -d, --dir             Base directory to look for files
  -p, --pattern         Pattern to select file names
  -m, --mode            Log transform function
  -s, --datastream      Target elasticsearch datastream
  -h, --help            Display this help

Current values:
  DIR=${DIR}
  PATTERN=$PATTERN
  MODE=$MODE
  DATASTREAM=$DATASTREAM
"
    exit "$status"
}


function require-assoc() {
    if [[ -z ${!1@a} && -z ${!1+set} ]]; then
        declare -gA "$1=()"
    elif [[ ${!1@a} != A ]]; then
        fatal "Expected variable $1 type assoc"
    fi
}

declare -A NEED_TAIL=()
function find_files() {
    if [[ -e $DIR && ! -d $DIR ]]; then
        fatal "Not a directory: $DIR"
    fi
    local old_state
    NEED_TAIL=()
    while IFS=$'\t' read -r file inode size; do
        old_state=( ${CACHE_DATA_state["$file"]} )
        [[ "$inode $size" == "${old_state[0]} ${old_state[1]}" ]] && continue
        NEED_TAIL["$file"]="$inode $size"
    done < <(find "$DIR" -type f -name "$PATTERN" -printf '%P\t%i\t%s\n')
}

# $1: file
# $2: inode
# $3: size
# Tailed chunk goes in $TEMP_FILE
function tail_file() {
    local file=$1 inode=$2 size=$3; shift 3

    # read all
    local skip_bytes=0
    local count_bytes=$size
    local head_sig= tail_od= truncated_new_data=

    if (( size < 1 )); then
        info "File $file, too small, skip"
        return 0
    fi

    if (( CHECK_NULL_TAIL > 0 )); then
        tail_od=$(tail -c 1 "$DIR/$file" |od -t x1 -An) ||
            { error "File $file, null tail check failed, skip"; return 1; }
        if [[ $tail_od == ' 00' ]]; then
            info "File $file, null tail, skip"
            return 0
        fi
    fi

    if (( HEAD_SIG_SIZE > 0 && size >= HEAD_SIG_SIZE )); then
        head_sig=$(head -c "$HEAD_SIG_SIZE" "$DIR/$file" |md5sum) ||
            { error "File $file, head signature failed, skip"; return 1; }
        head_sig=${head_sig%% *}
        info "File $file, computed head signature $head_sig"
    fi

    # read from last saved position
    local old_state=( ${CACHE_DATA_state["$file"]} )
    if [[ ${old_state[0]} == $inode ]]; then
        # if cache has a head signature, we need the current one to be able to
        # check for file truncation, otherwise we wait for more bytes
        if [[ -n ${old_state[3]} ]]; then
            if [[ -z $head_sig ]]; then
                info "File $file, head signature needed, wait for more bytes"
                return 0
            fi
            if [[ $head_sig != ${old_state[3]} ]]; then
                info "File $file, head signature change, assume truncated new data"
                truncated_new_data=1
            fi
        fi

        if [[ -z $truncated_new_data ]]; then
            if (( size >= old_state[1] )); then
                skip_bytes=$(( old_state[1] ))
                count_bytes=$(( size - old_state[1] ))
            else
                info "File $file, size not increasing, skip"
                return 0
            fi
        fi
    fi
    #if [[ ${old_state[0]} == $inode && $size -ge ${old_state[1]} ]]; then
    #    skip_bytes=$(( old_state[1] ))
    #    count_bytes=$(( size - old_state[1] ))
    #fi

    info "File $file, need to read $count_bytes bytes from $skip_bytes"

    if (( TEMP_MAX_SIZE > 0 && count_bytes > TEMP_MAX_SIZE )); then
        size=$(( size - (count_bytes - TEMP_MAX_SIZE) ))
        count_bytes=$TEMP_MAX_SIZE
        info "File $file, limit read to $count_bytes bytes from $skip_bytes"
    fi

    if (( CHECK_OPEN_WRITE > 0 )) &&
       lsof -nPlXwf -S2 -F al -- "$DIR/$file" |grep -qEi '^[al][wu]'; then
        info "File $file, open write detected, skip"
        return 0
    fi

    dd if="$DIR/$file" iflag=noatime,skip_bytes,count_bytes bs=512 \
        skip="$skip_bytes" count="$count_bytes" status=none > "$TEMP_FILE"
    (( $? == 0 )) || { error "Tail failed: $file"; return 1; }

    # discard last incomplete line
    local wc_last=( $(tail -n 1 "$TEMP_FILE" |wc) )
    if [[ ${wc_last[0]} == 0 && ${wc_last[2]} != 0 ]]; then
        truncate -s "-${wc_last[2]}" "$TEMP_FILE" ||
            { error "Discard of last incomplete line failed: $file"; return 1; }
        info "File $file, incomplete last line, discard ${wc_last[2]} bytes"
        size=$(( size - wc_last[2] ))
    fi

    cat "$TEMP_FILE"
    echo "CACHE_DATA_state[${file@Q}]=${inode@Q}\\ ${size@Q}\\ ${SECONDS@Q}\\ ${head_sig@Q}" >&9
}

# $1: mode
# $2: file
function transform_chunk() {
    local mode=$1; shift
    declare -f -F "${FUNCNAME}__$mode" >/dev/null ||
        { error "Invalid transform mode: $mode"; return 1; }
    "${FUNCNAME}__$mode" "$@"
}

# $1: file
function transform_chunk__raw() {
    local file=$1; shift
    jq -rMcR --arg file "$file" --argjson ds "{$DATASTREAM_FIELDS}" \
        '$ds + {log:{file:{path:$file}},event:{original:.}}'
}

# $1: file
function transform_chunk__iis() {
    local file=$1; shift
    require-assoc CACHE_DATA_header
    gawk -b -v "raw_header=${CACHE_DATA_header[$file]}" -v "file=$file" \
            -v "DATASTREAM_FIELDS=$DATASTREAM_FIELDS" '
function json(value, as_string) {
    if (match(value, "^-?[0-9]+(\\.[0-9]+)?$") && !as_string)
        return value;
    buf = "\"";
    while (match(value, /[\x1-\x1f\\"\x7f]/)) {
        buf = buf substr(value, 1, RSTART-1) JESC[substr(value, RSTART, RLENGTH)];
        value = substr(value, RSTART+RLENGTH);
    }
    return buf value "\"";
}

function bashquote(input) {
    gsub("\x27", "\x27\x5c\x27\x27", input);
    return "\x27" input "\x27";
}

BEGIN {
    lines_sent = 0;
    metafd = "/dev/fd/9";
    for (i=1; i<=127; i++)
        JESC[sprintf("%c", i)] = sprintf("\\u%04x", i);
}
{ gsub(/\r$/, ""); }
/^#Fields: / { raw_header = substr($0, 10); headers_len = 0; next; }
/^#/ { next; }
raw_header == "" {
    print "{" DATASTREAM_FIELDS ",\"log\":{\"file\":{\"path\":" json(file) \
        "}},\"event\":{\"original\":" json($0) "}}";
    lines_sent++;
    next;
}
{
    if (headers_len == 0) {
        headers_len = split(raw_header, headers);
        # camel case header names
        for (i = 1; i <= headers_len; i++) {
            headers[i] = gensub(/[^[:alnum:]]/, "-", "g", headers[i]);
            nwords = split(headers[i], hwords, "-+");
            headers[i] = tolower(hwords[1]);
            for (j = 2; j <= nwords; j++)
                headers[i] = headers[i] toupper(substr(hwords[j], 1, 1)) substr(hwords[j], 2);
        }
    }
    fields_len = split($0, fields);
    printf("{%s,%s", DATASTREAM_FIELDS, "\"log\":{\"file\":{\"path\":" json(file) \
        "}},\"event\":{\"original\":" json($0) "},\"iis\":{\"access\":{");
    nproperties = 0;
    for (i = 1; i <= fields_len; i++) {
        if (headers[i] == "") continue;
        if (headers[i] == "csVersion") force_as_string = 1;
        else force_as_string = 0;
        printf("%s", (nproperties?",":"") json(headers[i], force_as_string) ":" json(fields[i]));
        nproperties++;
    }
    printf("}}}\n");
    lines_sent++;
}
END {
    print "require-assoc CACHE_DATA_header" >> metafd;
    print "CACHE_DATA_header[" bashquote(file) "]=" bashquote(raw_header) >> metafd;
    print "lines_sent=" bashquote(lines_sent) >> metafd;
}
    '
}

while (( $# > 0 )); do
    case "$1" in
        -d|--dir) DIR=$2; shift ;;
        -p|--pattern) PATTERN=$2; shift ;;
        -m|--mode) MODE=$2; shift ;;
        -s|--datastream) DATASTREAM=$2; shift ;;
        -h|--help) exit_usage ;;
        *) exit_usage 1 ;;
    esac
    shift
done

[[ -z $DIR ]] && exit_usage 1
[[ -z $DIR ]] && exit_usage 1
[[ -z $PATTERN ]] && exit_usage 1
[[ -z $MODE ]] && exit_usage 1
[[ -z $DATASTREAM ]] && exit_usage 1

UUID=$(echo -n "$DIR/$PATTERN/$MODE/$DATASTREAM" |md5sum |sed -re 's,^(.{7}).*,\1,')
CACHE_FILE="${ZLOGTAIL_CACHEDIR:-/var/lib}/${PROGNAME}_${UUID}.cache"
TEMP_FILE="${ZLOGTAIL_TEMPDIR:-/dev/shm}/${PROGNAME}_${UUID}.tmp"
ES_ERRORS="${ZLOGTAIL_LOGDIR:-/var/log/logcenter}/${PROGNAME}-${UUID}.rej"

if [[ -e $CACHE_FILE ]] && ! source "$CACHE_FILE"; then
    fatal "Cache found but source failed: $CACHE_FILE"
fi
require-assoc CACHE_DATA_state

STOP_LOOP=
exec 9>&1 # metadata fd
trap 'STOP_LOOP=1' INT TERM
trap 'rm -f $TEMP_FILE' EXIT

# split datastream fiels
DATASTREAM_FIELDS=
if [[ -n $DATASTREAM ]]; then
    if [[ $DATASTREAM =~ ^([^-]+)-([^-]+)-([^-]+)$ ]]; then
        printf -v DATASTREAM_FIELDS '"data_stream":{"type":"%s","dataset":"%s","namespace":"%s"}' \
            "${BASH_REMATCH[1]}" "${BASH_REMATCH[2]}" "${BASH_REMATCH[3]}"
    fi
fi

while [[ -z $STOP_LOOP ]]; do
    SECONDS=$(date +%s) || fatal "Failed to get current time"
    (( run++ ))
    find_files

    for file in "${!NEED_TAIL[@]}"; do
        [[ -z $STOP_LOOP ]] || break

        info "Tail file $file, ${NEED_TAIL[$file]}${CACHE_DATA_state[$file]:+ vs cache ${CACHE_DATA_state[$file]}}"

        meta=$((
            tail_file "$file" ${NEED_TAIL["$file"]} |
                transform_chunk "$MODE" "$file" |
                    if [[ -n $DATASTREAM ]]; then
                        es-inject-json -d -e "$ES_ERRORS" - "$DATASTREAM"
                    else
                        cat
                    fi
        )9>&1)

        lines_sent=
        eval "$meta"
        if [[ -n $lines_sent && $lines_sent != 0 ]]; then
            info "File $file, lines sent $lines_sent"
        fi
        if [[ -n $UPDATE ]]; then
            declare -p ${!CACHE_DATA_*} > "$CACHE_FILE"
        fi
    done

    # clean cache
    if [[ -n $UPDATE ]] && (( run % 3600 == 0 )); then
        run=0
        for file in "${!CACHE_DATA_state[@]}"; do
            [[ -e $file ]] && continue
            state=( ${CACHE_DATA_state["$file"]} )
            (( (SECONDS-state[2]) < 2*86400 )) && continue
            info "File $file, clean from cache"
            for i in ${!CACHE_DATA_*}; do unset "$i[\"$file\"]"; done
        done
        declare -p ${!CACHE_DATA_*} > "$CACHE_FILE"
    fi

    sleep 10
done

info 'Exiting...'
