#!/bin/bash

[[ -f /etc/profile.d/logcenter-path.sh ]] &&
    source /etc/profile.d/logcenter-path.sh
[[ -f /etc/profile.d/logcenter-es.sh ]] &&
    source /etc/profile.d/logcenter-es.sh

OIFS=$IFS
PROGNAME=${0##*/}
LOCKFILE="/dev/shm/$PROGNAME.lock"
IFS=-
SRC_DS=( ${SRC_DS:-logs-dnsmasq.raw-zlc} )
DST_DS=( ${DST_DS:-logs-dnsmasq.dns-zlc} )
IFS=$OIFS
#MASTER_ONLY=
#DRY_RUN=
#QUIET=

function _log() {
    local now=$(date +%Y-%m-%dT%H:%M:%S.%3N%:z)
    local LOGGER_SEVERITY=${LOGGER_SEVERITY:-info}
    local STDOUT_LABEL=${STDOUT_LABEL:-INFO}
    logger -t "$PROGNAME[$$]" -p "$LOGGER_SEVERITY" -- "${DRY_RUN:+DRY-RUN ** }$*"
    [[ -n $QUIET && $STDOUT_LABEL == INFO ]] && return 0
    [[ -t 2 ]] && echo "$now $STDOUT_LABEL $PROGNAME: ${DRY_RUN:+DRY-RUN ** }$*" >&2
    return 0
}

function info() { LOGGER_SEVERITY=info STDOUT_LABEL=INFO _log "$@"; }
function warning() { LOGGER_SEVERITY=warning STDOUT_LABEL=WARNING _log "$@"; }
function error() { LOGGER_SEVERITY=err STDOUT_LABEL=ERROR _log "$@"; }
function fatal() { LOGGER_SEVERITY=crit STDOUT_LABEL=FATAL _log "$@"; exit 2; }

function release_lock() {
    rm "$LOCKFILE"
}

function acquire_lock_or_fatal() {
    if ! ln -sT "/proc/$$" "$LOCKFILE"; then
        [[ -e "$LOCKFILE" ]] || release_lock
        ln -sT "/proc/$$" "$LOCKFILE" ||
            fatal "Failed to acquire lock $LOCKFILE"
    fi
}


function i_am_master() {
    REPLY=$(es-curl _cat/nodes)
    (( $? == 0 )) || return 1
    REPLY=$(echo "$REPLY" |awk -v "me=$HOSTNAME" '
        $(NF-1) == "*" && $NF == me { print "master" }')
    return 0
}

if [[ -n $MASTER_ONLY ]]; then
    i_am_master || fatal 'Failed to test if master'
    [[ $REPLY == master ]] || { info 'Not on master node, abort'; exit 0; }
fi

now=$(date +%s) || fatal 'Failed to get current time'

acquire_lock_or_fatal
trap release_lock EXIT

last_ts=$(jq -n '{
    query:{query_string:{query:"dnsmasq.service:dns AND dnsmasq.joined:true AND dns.type:query"}},
    sort:[{"@timestamp":"desc"}],
    size:1,
    fields:[{field:"@timestamp",format:"epoch_second"}],
    _source:false
}' |
    ES_LOCAL_CURL_FAIL= es-curl "${SRC_DS[0]}-${SRC_DS[1]}-${SRC_DS[2]}/_search" -X POST \
        -d @- -w '\n%{http_code}' |
    jq -rs '
first as $response |
last as $status |
if $status == 200 then
    try ($response.hits.hits |map(.fields["@timestamp"]) |first |first)//""
elif $status == 404 then
    "no-index"
else
    "error"
end')

[[ $last_ts == error ]] && fatal "Failed to query raw index"
[[ $last_ts == no-index ]] &&  { info "No data, abort"; exit 0; }

last_ts=${last_ts%.*}
info "Fetched last_ts $last_ts, diff now $((now - last_ts))"

if (( last_ts > 0 && (now - last_ts) > 60*10 )); then
    warning "last_ts $last_ts is too old, reset window"
    last_ts=0
fi

if (( last_ts > 0 )); then
    last_ts=$(( (last_ts - 60*2) * 1000 ))
else
    last_ts=now-120s
fi


query_window="@timestamp:[$last_ts TO now]"
info "Elasticsearch query window $query_window"
scroll_query=$(jq -n --arg query_window "$query_window" '{
    query:{query_string:{query:($query_window + " AND dnsmasq.service:dns AND !dnsmasq.joined:true")}},
    sort:[{"@timestamp":"asc"}]}')

es-scroll "${SRC_DS[0]}-${SRC_DS[1]}-${SRC_DS[2]}" "$scroll_query" |
    jq -Mc '.hits.hits[]' |
    jq -Msc --argjson DST_DS "[\"${DST_DS[0]}\",\"${DST_DS[1]}\",\"${DST_DS[2]}\"]" '
reduce .[] as $i (
    {};
    $i._id as $id |
    $i._index as $index |
    (try (($i.sort |first) / 1000) catch null) as $ts |
    $i._source as $doc |
    if ($id == null or $index == null or $ts == null or $doc.dnsmasq.serial == null or
        $doc.source.ip == null or $doc.source.port == null) then empty else . end |

    # group by dnsmasq.serial/source.ip/source.port
    ($doc.dnsmasq.serial + "/" + $doc.source.ip + "/" + ($doc.source.port |tostring)) as $key |
    . + { ($key): (.[$key] + [{ref:{index:$index, id:$id}, ts:$ts, doc:$doc}]) }
)//{} |
with_entries(
    # need a query
    (try [(.value[] |select(.doc.dns.type == "query"))]) as $query |
    if ($query |length) == 0 then empty else . end |

    # skip if approching the end boundary to avoid missing answers
    # note: this is jq <now>, not elasticsearch <now> but that is good enough
    if (now - $query[0].ts) < 40 then empty else . end |

    [(.value[] |select(.doc.dnsmasq.context == "forwarded"))] as $forwarded |
    [(.value[] |select(.doc.dns.type == "answer"))] as $answers |
    .value = {
        ref: .value |map(.ref),
        doc: {
            "@timestamp": $query[0].doc["@timestamp"],
            tags: $query[0].doc.tags,
            host: $query[0].doc.host,
            source: $query[0].doc.source,
            dnsmasq: {
                serial: $query[0].doc.dnsmasq.serial,
                service: $query[0].doc.dnsmasq.service,
                forwarded_ip: $forwarded |map(.doc.dnsmasq.forwarded_ip),
            },
            dns: {
                question: $query[0].doc.dns.question,
                answers: $answers |map(
                    { name: .doc.dns.question.name } +
                    if (.doc.dnsmasq.answer != null) and (.doc.dnsmasq.answer |startswith("<")) and (.doc.dnsmasq.answer |endswith(">")) then
                        { type: .doc.dnsmasq.answer |gsub("[<>]"; "") }
                    elif (.doc.dns.resolved_ip != null) then
                        { data: .doc.dns.resolved_ip }
                    elif (.doc.dnsmasq.answer != null) then
                        { data: .doc.dnsmasq.answer }
                    else
                        {}
                    end
                ),
                resolved_ip: [$answers[] |select(.doc.dns.resolved_ip != null)] |map(.doc.dns.resolved_ip),
            },
            event: $query[0].doc.event,
            network: $query[0].doc.network,
            data_stream: {
                type: $DST_DS[0],
                dataset: $DST_DS[1],
                namespace: $DST_DS[2],
            },
        }
    }
) |
to_entries |map(.value)[] |
(
    (.ref |map(
        {update:{_index:.index, _id:.id}},
        {doc:{dnsmasq:{joined:true}}})[]
    ),
    {create:{_index: (.doc.data_stream.type + "-" + .doc.data_stream.dataset + "-" + .doc.data_stream.namespace) }},
    .doc
)
' |
if [[ -n $DRY_RUN ]]; then
    cat
else
    gzip -c > "/dev/shm/$PROGNAME.last_bulk.gz"
    input=$(zcat "/dev/shm/$PROGNAME.last_bulk.gz" |
        jq -rs 'reduce .[] as $i ({};
            if ($i.update) then . + { update: (.update+1)}
            elif ($i.create) then . + { create: (.create+1)}
            else . end) |
            "update=\(.update), create=\(.create)"')


    zcat "/dev/shm/$PROGNAME.last_bulk.gz" |
        es-curl /_bulk?refresh=true -X POST --data-binary @- |
            gzip -c > "/dev/shm/$PROGNAME.last_result.gz"

    result=$(zcat /dev/shm/dnsmasq-join.last_result.gz |
        jq -r '"errors=\(.errors), took=\(try .took/1000), ingest_took=\(try .ingest_took/1000), items=\(try .items|length)"')

    info "Input ${input:-unknown}, ${result:-unknown}"
    
fi
