#!/usr/bin/lua
package.path = package.path..';'..(arg[0]:gsub('/[^/]+$', ''))..'/?.lua'

-- Copyright Julien Thomas < julthomas @ free.fr >
-- Copyright Zenetys < jthomas @ zenetys.com >
-- Author: Julien Thomas
-- Licence: MIT

local lc = require 'libcheck'
local es = require 'libes'
local lp = require 'libperfdata'
local lu = require 'libutil'
local ZCurl = require 'libzcurl'
require 'libzcurlexec'
require 'print_r'

local NOW = (tonumber(os.getenv('FAKE_NOW')) or os.time())

lc.checkname = 'SOC-ALERT'
lc.shortdescr = 'Nagios plugin for SOC alert'
lc.progtype = 'soc'
lc.usage_notes = [[
When a curl-bin option is set, curl command is executed instead of
libcurl bindings. The default format for the command executed (popen)
is '${bin} ${arg}', modifiable with curl-fmt option, which enables to
run via SSH with proper escape, eg: 'ssh u@h %{sh(bin.." "..arg)}'
]]

-- command line options
lc.opts = {
    query_param = {},
    soc_es_header = {},
    soc_es_netrc = 0,
    soc_es_timeout = 10,
    soc_es_indice = 'logs-soc-default',
    alert_ttl = 2*3600,
}

pft_defaults = {
    period = 1800,
    to = NOW,
}

function lc.setter_opt_match(opt, value)
    if value == nil then return nil end
    local result = { value:match(opt.pattern) }
    if not result[1] then return nil end
    return (opt.array and result or result[1])
end

lc.optsdef = {
    { short = 'n', long = 'name', required = true, help = 'Short identifier for this alert' },
    { short = 'qp', long = 'query-param', call = lc.setter_opt_kv, help = 'Query parameters (kv)' },
    { short = 'seu', long = 'soc-es-url', required = true, help = 'ES soc, base URL' },
    { short = 'sei', long = 'soc-es-indice', required = true, help = 'ES soc, indice' },
    { short = 'sen', long = 'soc-es-username', help = 'ES soc, username' },
    { short = 'sew', long = 'soc-es-password', help = 'ES soc, password' },
    { short = 'sek', long = 'soc-es-api-key', help = 'ES soc, API Key' },
    { short = 'seh', long = 'soc-es-header', call = lc.setter_opt_array, help = 'ES soc, add header (array)' },
    { short = 'sec', long = 'soc-es-netrc', call = lc.setter_opt_iboolean, help = 'ES soc, enable ~/.netrc' },
    { short = 'seT', long = 'soc-es-timeout', call = lc.setter_opt_number, help = 'ES soc, timeout (s)' },
    { short = 'secs', long = 'soc-es-curl-bin', call = lc.setter_opt_array, help = 'ES soc, path to curl (array)' },
    { short = 'secf', long = 'soc-es-curl-fmt', help = 'ES soc, curl command format' },
    { short = 'seca', long = 'soc-es-curl-arg', call = lc.setter_opt_array, help = 'ES soc, add arg for curl (array)' },
    { short = 'ssa', long = 'soc-syslog-addr', help = 'Enable syslog output, IP or /dev/log' },
    { short = 'ssp', long = 'soc-syslog-port', call = lc.setter_opt_number, help = 'Syslog output, port' },
    { short = 'snp', long = 'soc-nagios-pipe', help = 'Enable Nagios pcheck, command pipe' },
    { short = 'snh', long = 'soc-nagios-host', help = 'Nagios pcheck, host' },
    { short = 'sns', long = 'soc-nagios-service', help = 'Nagios pcheck, service' },
    { short = 'sm', long = 'soc-meta', call = lc.setter_opt_kv, help = 'Add metadata to soc documents (kv)' },
    { short = 'ae', long = 'alert-expr', help = 'Return true to alert for a key (lua)' },
    { short = 'at', long = 'alert-ttl', call = lc.setter_opt_number, help = 'Key alert min internal (s)' },
}

lc.init_opts()

function process_options()
    if lc.opts.soc_es_api_key then
        table.insert(lc.opts.soc_es_header, 'authorization: ApiKey '..lc.opts.soc_es_api_key)
    end
    if lc.opts.soc_nagios_pipe then
        if not lc.opts.soc_nagios_host then return nil, 'nagios host required' end
        if not lc.opts.soc_nagios_service then return nil, 'nagios service required' end
    end

    local pft, err = es.period_from_to({
        period = lc.opts.query_param.p_period,
        from = lc.opts.query_param.p_from,
        to = lc.opts.query_param.p_to },
        pft_defaults)
    if not pft then return nil, err end
    for k,v in pairs(pft) do
        lc.opts.query_param['p_'..k] = v
        lc.opts.query_param.p_from = nil
        lc.opts.query_param.p_to = nil
    end

    return true
end

local success, err = process_options()
if not success then lc.die_unkn('Invalid options: '..err) end

-- query elasticsearch
local q_def = [[
{
  "query": {
    "query_string": {
      "query": "@timestamp:[\"%{p_from_rfc3339}\" TO \"%{p_to_rfc3339}\"] AND (${p_filter:-*})"
    }
  },
  "size": 10000,
  "sort": ["_doc"],
  "_source": ["soc.data", "soc.query"]
}
]]
local q_post_data = lu.expand(q_def, lc.opts.query_param, 'query',
    function(x) return (tostring(x):gsub('([\\"])', '\\%1')) end)
lc.debug('Dump query POST data\n'..q_post_data)

local zcurl, err = ZCurl.new_abstract({
    ssl_verifypeer = 0,
    ssl_verifyhost = 0,
    followlocation = true,
    failonerror = true,
    cookiefile = '', -- in memory cookies
    writefunction = ZCurl._store_response('body'),
    timeout = lc.opts.soc_es_timeout,
    verbose = lc.opts.debug,
    username = lc.opts.soc_es_username,
    password = lc.opts.soc_es_password,
    netrc = lc.opts.soc_es_netrc,
    httpheader = lc.opts.soc_es_header,
    zcurl_exec_bin = (lc.opts.soc_es_curl_bin and lc.opts.soc_es_curl_bin),
    zcurl_exec_arg = (lc.opts.soc_es_curl_bin and lc.opts.soc_es_curl_arg and lc.opts.soc_es_curl_arg),
    zcurl_exec_fmt = (lc.opts.soc_es_curl_bin and lc.opts.soc_es_curl_fmt and lc.opts.soc_es_curl_fmt),
})
if not zcurl then lc.die_unkn('cURL init failed: '..err) end

local ret, err = zcurl:perform({
    httpheader = lu.acopy(lc.opts.soc_es_header, { 'content-type: application/json', '' }),
    url = lc.opts.soc_es_url..'/'..lc.opts.soc_es_indice..'/_search',
    postfields = q_post_data,
})
if not ret then lc.die_unkn(err) end
lc.dump(zcurl.response.body, 'Dump query response body')

-- json decode result
local q_data, err = lc.cjson.decode(zcurl.response.body)
if not q_data then lc.die_unkn('JSON decode failed: '..err) end
lc.dump(q_data, 'Dump decoded JSON response')

-- analyze result data
local hits_by_key = {}
local stats_by_key = {}
function one_key(v, stats)
    if not v._source then return nil end
    if not v._id then return nil end
    local id = v._id
    v = v._source
    local key = lu.getpath(v, 'soc.data.key')
    if not key then return nil end
    hits_by_key[key] = hits_by_key[key] or {}
    table.insert(hits_by_key[key], lu.getpath(v, 'soc.data', nil, 'table'))
    stats_by_key[key] = stats_by_key[key] or {
        key = key,
        queries = {},
        count = 0,
        distinct_name = 0,
        count_by_name = {},
        intensities = {},
        count_by_intensity = { 0,0,0,0,0 },
        count_by_least_intensity = { 0,0,0,0,0 },
    }
    local stats = stats_by_key[key]
    table.insert(stats.queries, id)
    stats.count = stats.count + 1
    local name = lu.getpath(v, 'soc.query.name', nil, 'string')
    local intensity = lu.getpath(v, 'soc.data.intensity', nil, 'number')
    if name then stats.count_by_name[name] = (stats.count_by_name[name] or 0) + 1 end
    if intensity then
        table.insert(stats.intensities, intensity)
        if stats.count_by_intensity[intensity] then
            stats.count_by_intensity[intensity] = stats.count_by_intensity[intensity] + 1
            for i = 1, intensity, 1 do
                stats.count_by_least_intensity[i] = stats.count_by_least_intensity[i] + 1
            end
        end
    end
    return key
end
for _,v in ipairs(q_data.hits.hits) do
    one_key(v)
end
for _,o in pairs(stats_by_key) do
    for k,v in pairs(o.count_by_name) do
        o.distinct_name = o.distinct_name + 1
    end
    o.count_by_name = nil
    if #o.intensities > 0 then
        o.min_intensity = math.min(table.unpack(o.intensities))
        o.max_intensity = math.max(table.unpack(o.intensities))
        o.sum_intensity = 0
        for _,i in ipairs(o.intensities) do o.sum_intensity = o.sum_intensity + i end
        o.avg_intensity = o.sum_intensity / #o.intensities
    end
    o.intensities = nil
end

function build_alert(o)
    local doc = {
        ['@timestamp'] = lc.opts.query_param.p_to_rfc3339,
        soc = {
            alert = {
                name = lc.opts.name,
                from = lc.opts.query_param.p_from_rfc3339,
                to = lc.opts.query_param.p_to_rfc3339,
                period = lc.opts.query_param.p_period,
            },
            meta = lc.opts.soc_meta,
            data = o,
            explain = { queries = o.queries }
        }
    }
    -- careful, it modifies the source
    doc.soc.data.queries = nil
    return doc
end

-- alert / output
local alerts = {}
local errors = {}
local outputs = {
    specs = { --[[ es => { fn, args [, err] } ]] },
    add = function(self, name, fn, ...)
        self.specs[name] = { fn = fn, args = {...} }
    end,
    fail = function(self, name, err)
        if not self.specs[name] then self.specs[name] = {} end
        self.specs[name].err = err
    end,
    length = 0,
    failed = 0,
    run = function(self, ...)
        for o,s in pairs(self.specs) do
            self.length = self.length + 1
            if not s.err then
                local ret, err = s.fn(table.unpack({...}), table.unpack(s.args))
                s.err = err
            end
            if s.err then
                self.failed = self.failed + 1
                table.insert(errors, s.err)
            end
        end
    end,
}

-- stderr
outputs:add('stderr', function (alerts)
    for _,a in ipairs(alerts) do
        lc.perr(lc.cjson.encode(a))
    end
end)

-- syslog
if lc.opts.soc_syslog_addr then
    local ret, err = lu.syslog.init('soc.alert.'..lc.opts.name, nil,
        lc.opts.soc_syslog_addr, lc.opts.soc_syslog_port)

    if ret then
        outputs:add('syslog', function (alerts)
            for _,a in ipairs(alerts) do
                local _,err lu.syslog.notice(lc.cjson.encode(a))
                if not ret then return false, err end
            end
        end)
    else
        outputs:fail('syslog', err)
    end
end

-- nagios
if lc.opts.soc_nagios_pipe then
    local fd, err = io.open(lc.opts.soc_nagios_pipe, 'a')
    if fd then
        outputs:add('nagios', function (alerts)
            for _,a in ipairs(alerts) do
                local msg = '['..lc.opts.name.."] Key '"..a.soc.data.key.."' on last "..
                    lc.opts.query_param.p_period..'s, count: '..a.soc.data.count..' '..
                    lc.cjson.encode(a.soc.data.count_by_intensity)..', distinct_name: '..
                    a.soc.data.distinct_name
                fd:write('['..NOW..'] PROCESS_SERVICE_CHECK_RESULT;'..
                    tostring(lc.opts.soc_nagios_host)..';'..
                    tostring(lc.opts.soc_nagios_service)..';'..
                    lc.CRITICAL..';'..msg..'\n')
            end
        end)
    else
        outputs:fail('nagios', err)
    end
end

local cache = lc.load_cache()
if not cache.last_alert then cache.last_alert = {} end
-- expire old entries
for k,v in pairs(cache.last_alert) do
    if (NOW - v) > lc.opts.alert_ttl then
        cache.last_alert[k] = nil
    end
end

-- evaluate for alert
local expr_stats_methods = { __index = {
    set = function(self, stats_name, value)
        self[stats_name] = value
    end,
    sum = function(self, stats_name)
        local sum = 0
        for _,v in ipairs(hits_by_key[self.key]) do
            sum = sum + v[stats_name]
        end
        return sum
    end,
}}
for _,o in pairs(stats_by_key) do
    setmetatable(o, expr_stats_methods)
    local ret, err = lu.lua(lc.opts.alert_expr, 'expr', 'boolean', nil, { k = o })
    if ret == nil then
        -- XXX: output can be very long
        -- XXX: display only first 10 errors, but count them all
        table.insert(errors, err)
    elseif ret == true and (NOW - (cache.last_alert[o.key] or 0) > lc.opts.alert_ttl) then
        cache.last_alert[o.key] = NOW
        table.insert(alerts, build_alert(o))
    end
end

lc.save_cache(cache)
outputs:run(alerts)

perfdata = {
    { name = 'alerts', value = #alerts, uom = '', label = 'Alerts' },
    { name = 'errors', value = #errors, uom = '', label = 'Errors', critical = 0 },
    { name = 'outputs', value = outputs.length - outputs.failed, uom = '', label = 'Outputs',
      max = outputs.length, critical = '100%:' }

}

lc.exit_code = lp.compute_perfdata(perfdata)
lc.exit_message = '['..lc.opts.name..'] '..lp.format_output(perfdata)..
    (#errors > 0 and ' - '..table.concat(errors, ', ') or '')..
    '|'..lp.format_perfdata(perfdata, true)

