#!/usr/bin/lua
package.path = package.path..';'..(arg[0]:gsub('/[^/]+$', ''))..'/?.lua'

-- Copyright Julien Thomas < julthomas @ free.fr >
-- Copyright Zenetys < jthomas @ zenetys.com >
-- Author: Julien Thomas
-- Licence: MIT

local lc = require 'libcheck'
local es = require 'libes'
local lp = require 'libperfdata'
local lu = require 'libutil'
require 'print_r'

local NOW = (tonumber(os.getenv('FAKE_NOW')) or os.time())

lc.checkname = 'SOC-WATCH'
lc.shortdescr = 'Nagios plugin for SOC watch via Elasticsearch query'
lc.progtype = 'soc'
lc.usage_notes = [[
All --es-* options set parameters for a given ES backend, values must
be given as <backend-name>=<value>. For instance, --es-timeout query=10
sets timeout to 10 seconds on the ES backend named 'query'.

When an --es-curl-bin option is set, the curl command is executed
instead of using libcurl bindings. The default format for the command
executed (popen) is '${bin} ${arg}', modifiable with --es-curl-fmt
option, which enables to run via SSH with proper escape, eg:
--es-curl-fmt 'ssh u@h %{sh(bin.." "..arg)}'
]]

-- command line options
lc.opts = {
    _es = {},
    query_param = {},
    warning_query_time = 3,
    critical_query_time = 5,
}

local pft_defaults = {
    period = 300,
    to = NOW,
}

local default_query_param = {
    p_date_field = '@timestamp',
    p_filter_base = '*',
    p_key_field = 'source.ip',
    p_order = 'ag_rate',
    p_key_min_count = 0,
    p_key_min_rate = 0,
    p_key_min_ratio = 0,
    p_key_size_lookup = 200,
    p_key_size_limit = 100,
    p_http_status_field = 'http.response.status_code',
    p_http_err_filter_expr = 'params.err_rate > 0'
}

local esh = {}
local esh_query
local esh_soc
local query_def

-- usage: expr|i1|i2|i3|i4
-- with i1 to i4, boundaries for intensity:
--      - ~:i1, intensity 1
--      - i1:i2, intensity 2
--      - i2:i3, intensity 3
--      - i3:i4, intensity 4
--      - i4:~, intensity 4
-- if i1 > i2 > i3 ..., the scale switches in reverse
-- mode, meaning the smallest the value, the higher the
-- intensity
function lc.setter_opt_intensity(opt, value)
    if value == nil then return nil end
    local rule = { scale = {} }
    for i in ((value):gmatch('([^|]+)')) do
        if rule.expr then table.insert(rule.scale, tonumber(i))
        else rule.expr = i end
    end
    if #rule.scale ~= 4 then return nil end
    if not lc.opts[opt.key] then lc.opts[opt.key] = {} end
    table.insert(lc.opts[opt.key], rule)
    return lc.opts[opt.key]
end

function get_intensity(data)
    function get_one(id, rule, env)
        local value = lu.lua('return ('..rule.expr..')', 'intensity #'..id, 'number', nil, env)
        if not value then return nil end
        local reverse = (rule.scale[1] > rule.scale[2])
        local left, right
        for i = #rule.scale, 0, -1 do
            right = left
            left = (i > 0 and rule.scale[i] or nil)
            if reverse then from, to = right, left
            else from, to = left, right end
            if (not from or value >= from) and (not to or value < to) then
                return i+1
            end
        end
        return nil
    end
    if not lc.opts.soc_intensity then return nil end
    local max_intensity
    local env = lu.tcopy(data, { math = math })
    for i,r in ipairs(lc.opts.soc_intensity) do
        local one_intensity = get_one(i, r, env)
        if one_intensity then
            if max_intensity then max_intensity = math.max(max_intensity, one_intensity)
            else max_intensity = one_intensity end
        end
    end
    return max_intensity
end

function setter_opt_es(opt, value)
    local bk, value = value:match('^([^=]+)=(.*)')
    if not bk then return nil end
    if value == '$' then return true end
    if not lc.opts._es[bk] then lc.opts._es[bk] = {} end
    local _es_k = opt.key:sub(4) -- sub(4) to skip es_
    if opt.type == 'number' then
        value = lc.setter_opt_number(nil, value)
    elseif opt.type == 'iboolean' then
        value = lc.setter_opt_iboolean(nil, value)
    elseif opt.type == 'array' then
        local a = (lc.opts._es[bk][_es_k] or {})
        table.insert(a, value)
        value = a
    end
    if not value then return nil end
    lc.opts._es[bk][_es_k] = value
    return true
end

lc.optsdef = {
    { short = 'eu', long = 'es-url', help = 'Set url for an ES backend', call = setter_opt_es },
    { short = 'ei', long = 'es-indice', help = 'Set indice for an ES backend', call = setter_opt_es },
    { short = 'eo', long = 'es-optype', help = 'Set insert op_type for an ES backend', call = setter_opt_es },
    { short = 'ed', long = 'es-doctype', help = 'Set insert doc_type for an ES backend', call = setter_opt_es },
    { short = 'en', long = 'es-username', help = 'Set username for an ES backend', call = setter_opt_es },
    { short = 'ew', long = 'es-password', help = 'Set password for an ES backend', call = setter_opt_es },
    { short = 'ek', long = 'es-api-key', help = 'Set API key for an ES backend', call = setter_opt_es },
    { short = 'eh', long = 'es-header', help = 'Add extra header for an ES backend', call = setter_opt_es, type = 'array' },
    { short = 'ec', long = 'es-netrc', help = 'Set netrc option (0|1) for an ES backend', call = setter_opt_es, type = 'iboolean' },
    { short = 'eT', long = 'es-timeout', help = 'Set timeout (s) for an ES backend', call = setter_opt_es, type = 'number' },
    { short = 'ecs', long = 'es-curl-bin', help = 'Set path to curl (array) for an ES backend', call = setter_opt_es, type = 'array' },
    { short = 'ecf', long = 'es-curl-fmt', help = 'Set curl command format for an ES backend', call = setter_opt_es },
    { short = 'eca', long = 'es-curl-arg', help = 'Add extra curl args (array) for an ES backend', call = setter_opt_es, type = 'array' },
    { short = 'n', long = 'name', required = true, help = 'Short identifier for this watch' },
    { short = 'qt', long = 'query-type', required = true, help = 'Query type to run' },
    { short = 'qp', long = 'query-param', call = lc.setter_opt_kv, help = 'Query parameters (kv)' },
    { short = 'qeb', long = 'query-es-backend', required = true, help = 'ES backend for the query' },
    { short = 'seb', long = 'soc-es-backend', help = 'Enable ES output' },
    { short = 'ssa', long = 'soc-syslog-addr', help = 'Enable syslog output, IP or /dev/log' },
    { short = 'ssp', long = 'soc-syslog-port', call = lc.setter_opt_number, help = 'Syslog output, port' },
    { short = 'sm', long = 'soc-meta', call = lc.setter_opt_kv, help = 'Add metadata to soc documents (kv)' },
    { short = 'si', long = 'soc-intensity', call = lc.setter_opt_intensity, help = 'Intensity specs' },
    { short = 'wqt', long = 'warning-query-time', call = lc.setter_opt_number, help = 'Query time warning (s)' },
    { short = 'cqt', long = 'critical-query-time', call = lc.setter_opt_number, help = 'Query time critical (s)' },
}

-- queries definitions
queries = {
    count = {
        data = {
            count = 'hits.total.value'
        },
        filters = {
            { name = 'time', value = '${p_date_field}:[%{p_from_rfc3339} TO %{p_to_rfc3339}]', explain = false },
            { name = 'base', value = '${p_filter_base}' },
        },
        query = [[
{
  "query": {
    "query_string": {
      "query": "${auto_filters}"
    }
  },
  "size": 0,
  "track_total_hits": true
}
]],
    },
    krate = {
        buckets = 'aggregations.ag_key.buckets',
        data = {
            key = 'key',
            count = 'doc_count',
            rate = 'ag_rate.value',
        },
        filters = {
            { name = 'time', value = '${p_date_field}:[%{p_from_rfc3339} TO %{p_to_rfc3339}]', explain = false },
            { name = 'base', value = '${p_filter_base}' },
        },
        query = [[
{
  "query": {
    "query_string": {
      "query": "${auto_filters}"
    }
  },
  "size": 0,
  "aggs": {
    "ag_key": {
      "terms": {
        "field": "${p_key_field}",
        "order": { "_count": "desc" },
        "size": "${p_key_size_lookup}"
      },
      "aggs": {
        "ag_filter_key_min_count": {
          "bucket_selector": {
            "buckets_path": {
              "x": "_count"
            },
            "script": "params.x >= ${p_key_min_count}"
          }
        },
        "ag_min_date": {
          "min": {
            "field": "${p_date_field}"
          }
        },
        "ag_max_date": {
          "max": {
            "field": "${p_date_field}"
          }
        },
        "ag_rate": {
          "bucket_script": {
            "buckets_path": {
              "count": "_count"
            },
            "script": "return params.count/${p_period}"
          }
        },
        "ag_filter_min_rate": {
          "bucket_selector": {
            "buckets_path": {
              "rate": "ag_rate"
            },
            "script": "params.rate >= ${p_key_min_rate}"
          }
        },
        "ag_sort": {
          "bucket_sort": {
            "sort": [
              { "${p_order}": { "order": "desc" } }
            ],
            "size": "${p_key_size_limit}"
          }
        }
      }
    }
  }
}
]],
    },
    ukrate = {
        buckets = 'aggregations.ag_key.buckets',
        data = {
            key = 'key',
            count = 'doc_count',
            ucount = 'ag_ukey.value',
            rate = 'ag_rate.value',
        },
        filters = {
            { name = 'time', value = '${p_date_field}:[%{p_from_rfc3339} TO %{p_to_rfc3339}]', explain = false },
            { name = 'base', value = '${p_filter_base}' },
        },
        query = [[
{
  "query": {
    "query_string": {
      "query": "${auto_filters}"
    }
  },
  "size": 0,
  "aggs": {
    "ag_key": {
      "terms": {
        "field": "${p_key_field}",
        "order": { "_count": "desc" },
        "size": "${p_key_size_lookup}"
      },
      "aggs": {
        "ag_filter_key_min_count": {
          "bucket_selector": {
            "buckets_path": {
              "x": "_count"
            },
            "script": "params.x >= ${p_key_min_count}"
          }
        },
        "ag_min_date": {
          "min": {
            "field": "${p_date_field}"
          }
        },
        "ag_max_date": {
          "max": {
            "field": "${p_date_field}"
          }
        },
        "ag_rate": {
          "bucket_script": {
            "buckets_path": {
              "count": "_count"
            },
            "script": "return params.count/${p_period}"
          }
        },
        "ag_filter_min_rate": {
          "bucket_selector": {
            "buckets_path": {
              "rate": "ag_rate"
            },
            "script": "params.rate >= ${p_key_min_rate}"
          }
        },
        "ag_ukey": {
          "cardinality": {
            "field": "${p_ukey_field}"
          }
        },
        "ag_filter_ukey_min_count": {
          "bucket_selector": {
            "buckets_path": {
              "ukey_count": "ag_ukey"
            },
            "script": "params.ukey_count >= ${p_ukey_min_count}"
          }
        },
        "ag_sort": {
          "bucket_sort": {
            "sort": [
              { "${p_order}": { "order": "desc" } }
            ],
            "size": "${p_key_size_limit}"
          }
        }
      }
    }
  }
}
]],
    },
    http_err = {
        buckets = 'aggregations.ag_key.buckets',
        data = {
            key = 'key',
            hits = 'doc_count',
            status_good = 'ag_good.doc_count',
            status_401_403 = 'ag_401_403.doc_count',
            status_404 = 'ag_404.doc_count',
            status_other_4xx = 'ag_other_4xx.doc_count',
            status_5xx = 'ag_5xx.doc_count',
            rate = 'ag_rate.value',
            err_rate = 'ag_err_rate.value',
            err_ratio = 'ag_err_ratio.value',
            status_bad = 'ag_bad.value',
        },
        filters = {
            { name = 'time', value = '${p_date_field}:[%{p_from_rfc3339} TO %{p_to_rfc3339}]', explain = false },
            { name = 'base', value = '${p_filter_base}' },
            { name = 'http_2xx_3xx', value = '${p_http_status_field}:[200 TO 399]', query = false, explain = { negate = true } },
        },
        query = [[
{
  "query": {
    "query_string": {
      "query": "${auto_filters}"
    }
  },
  "size": 0,
  "aggs": {
    "ag_key": {
      "terms": {
        "field": "${p_key_field}",
        "order": { "_count": "desc" },
        "size": "${p_key_size_lookup}"
      },
      "aggs": {
        "ag_filter_key_min_count": {
          "bucket_selector": {
            "buckets_path": {
              "x": "_count"
            },
            "script": "params.x >= ${p_key_min_count}"
          }
        },
        "ag_good": {
          "filter": {
            "query_string": {
              "query": "${p_http_status_field}:[200 TO 399]"
            }
          }
        },
        "ag_401_403": {
          "filter": {
            "query_string": {
              "query": "${p_http_status_field}:(401 OR 403)"
            }
          }
        },
        "ag_404": {
          "filter": {
            "query_string": {
              "query": "${p_http_status_field}:404"
            }
          }
        },
        "ag_other_4xx": {
          "filter": {
            "query_string": {
              "query": "${p_http_status_field}:(400 OR 402 OR [405 TO 499])"
            }
          }
        },
        "ag_5xx": {
          "filter": {
            "query_string": {
              "query": "${p_http_status_field}:[500 TO 599]"
            }
          }
        },
        "ag_bad": {
          "bucket_script": {
            "buckets_path": {
              "count_401_403": "ag_401_403>_count",
              "count_404": "ag_404>_count",
              "count_other_4xx": "ag_other_4xx>_count",
              "count_5xx": "ag_5xx>_count"
            },
            "script": "return (params.count_401_403 ?: 0) + (params?.count_404 ?: 0) + (params.count_other_4xx ?: 0) + (params.count_5xx ?: 0)"
          }
        },
        "ag_err_ratio": {
          "bucket_script": {
            "buckets_path": {
              "numerator": "ag_bad",
              "denominator": "_count"
            },
            "script": "return params.numerator/params.denominator"
          }
        },
        "ag_min_date": {
          "min": {
            "field": "${p_date_field}"
          }
        },
        "ag_max_date": {
          "max": {
            "field": "${p_date_field}"
          }
        },
        "ag_rate": {
          "bucket_script": {
            "buckets_path": {
              "count": "_count"
            },
            "script": "return params.count/${p_period}"
          }
        },
        "ag_err_rate": {
          "bucket_script": {
            "buckets_path": {
              "err_count": "ag_bad"
            },
            "script": "return params.err_count/${p_period}"
          }
        },
        "ag_filter": {
          "bucket_selector": {
            "buckets_path": {
              "hits": "_count",
              "err_ratio": "ag_err_ratio",
              "status_401_403": "ag_401_403>_count",
              "status_404": "ag_404>_count",
              "status_other_4xx": "ag_other_4xx>_count",
              "status_5xx": "ag_5xx>_count",
              "status_good": "ag_good>_count",
              "status_bad": "ag_bad",
              "rate": "ag_rate",
              "err_rate": "ag_err_rate"
            },
            "script": "${p_http_err_filter_expr}"
          }
        },
        "ag_sort": {
          "bucket_sort": {
            "sort": [
              { "ag_err_rate": { "order": "desc" } },
              { "ag_err_ratio": { "order": "desc" } }
            ],
            "size": "${p_key_size_limit}"
          }
        }
      }
    }
  }
}
]],
    },
    fw = {
        buckets = 'aggregations.ag_key.buckets',
        data = {
            key = 'key',
            count = 'doc_count',
            rate = 'ag_rate.value',
            count_destination_ip = 'ag_distinct_destination_ip.value',
            count_destination_port = 'ag_distinct_destination_port.value',
        },
        filters = {
            { name = 'time', value = '${p_date_field}:[%{p_from_rfc3339} TO %{p_to_rfc3339}]', explain = false },
            { name = 'base', value = '${p_filter_base}' },
        },
        query = [[
{
  "query": {
    "query_string": {
      "query": "${auto_filters}"
    }
  },
  "size": 0,
  "aggs": {
    "ag_key": {
      "terms": {
        "field": "${p_key_field}",
        "order": { "_count": "desc" },
        "size": "${p_key_size_lookup}"
      },
      "aggs": {
        "ag_filter_key_min_count": {
          "bucket_selector": {
            "buckets_path": {
              "x": "_count"
            },
            "script": "params.x >= ${p_key_min_count}"
          }
        },
        "ag_min_date": {
          "min": {
            "field": "${p_date_field}"
          }
        },
        "ag_max_date": {
          "max": {
            "field": "${p_date_field}"
          }
        },
        "ag_rate": {
          "bucket_script": {
            "buckets_path": {
              "count": "_count"
            },
            "script": "return params.count/${p_period}"
          }
        },
        "ag_filter_min_rate": {
          "bucket_selector": {
            "buckets_path": {
              "rate": "ag_rate"
            },
            "script": "params.rate >= ${p_key_min_rate}"
          }
        },
        "ag_distinct_destination_ip": {
          "cardinality": {
            "field": "destination.ip"
          }
        },
        "ag_distinct_destination_port": {
          "cardinality": {
            "field": "destination.port"
          }
        },
        "ag_fw_filter": {
          "bucket_selector": {
            "buckets_path": {
              "count": "_count",
              "rate": "ag_rate",
              "distinct_dip": "ag_distinct_destination_ip",
              "distinct_dport": "ag_distinct_destination_port"
            },
            "script": "${p_fw_filter:-true}"
          }
        },

        "ag_sort": {
          "bucket_sort": {
            "sort": [
              { "${p_order}": { "order": "desc" } }
            ],
            "size": "${p_key_size_limit}"
          }
        }
      }
    }
  }
}
]],
    },
    krate_join = {
        script = function ()
            local result = { entries = {}, query_time = 0 }
            local data_left, err = esh_query:search([[ {"query":{"query_string":{"query":"${p_date_field}:[%{p_from_rfc3339} TO %{p_to_rfc3339}] AND ${p_filter_left}"}},"size":0,"aggs":{"ag_key":{"terms":{"field":"${p_key_field}","size":"${p_key_size_lookup}"},"aggs":{"ag_join":{"terms":{"field":"%{p_join_field_left or p_join_field}","size":"${p_key_size_lookup}"}}}}}} ]], nil, es.env(lc.opts.query_param))
            if not data_left then lc.die_unkn('Left query failed: '..err) end
            result.query_time = result.query_time + data_left.took/1000
            lc.opts.query_param._all_join_keys = {}
            for _,b_key in ipairs(data_left.aggregations.ag_key.buckets) do
                for _,b_join_left in ipairs(b_key.ag_join.buckets) do
                    table.insert(lc.opts.query_param._all_join_keys, b_join_left.key)
                end
            end
            if #lc.opts.query_param._all_join_keys == 0 then return result end
            local data_right, err = esh_query:search([[ {"query":{"query_string":{"query":"${p_date_field}:[%{p_from_rfc3339} TO %{p_to_rfc3339}] AND ${p_filter_right} AND %{OR(p_join_field, _all_join_keys)}"}},"size":0,"aggs":{"ag_join":{"terms":{"field":"%{p_join_field_right or p_join_field}","size":"${p_key_size_lookup}"}}}} ]], nil, es.env(lc.opts.query_param))
            if not data_right then lc.die_unkn('Right query failed: '..err) end
            result.query_time = result.query_time + data_right.took/1000
            if #data_right.aggregations.ag_join.buckets == 0 then return result end
            local right_count_by_join_key = {}
            for _,b_join_right in ipairs(data_right.aggregations.ag_join.buckets) do
                right_count_by_join_key[b_join_right.key] = b_join_right.doc_count
            end
            for _,b_key in ipairs(data_left.aggregations.ag_key.buckets) do
                local join_count = 0
                local explain_join_keys = {}
                for _,b_join_left in ipairs(b_key.ag_join.buckets) do
                    table.insert(explain_join_keys, b_join_left.key)
                    if right_count_by_join_key[b_join_left.key] then
                        join_count = join_count + right_count_by_join_key[b_join_left.key]
                    end
                end
                if join_count > (tonumber(lc.opts.query_param.p_key_min_count) or 0) then
                    table.insert(result.entries, {
                        data = {
                            key = b_key.key,
                            count = join_count,
                            rate = join_count/lc.opts.query_param.p_period,
                        },
                        explain = {
                            indice = esh_query.indice,
                            date_field = lc.opts.query_param.p_date_field,
                            filters = {
                                { name = 'flow_events', value = '_exists_:service.name' },
                                { name = 'message_ids', value = es.OR(lc.opts.query_param.p_join_field, explain_join_keys) },
                            },
                        },
                    })
                end
            end
            return result
        end
    },
}

function init_check()
    query_def = queries[lc.opts.query_type]
    if not query_def then return nil, 'invalid query type' end

    for k,v in pairs(lc.opts._es) do
        if lc.opts.debug then v.verbose = true end
        esh[k], err = es.EsHandle.new(v)
        if not esh[k] then return nil, 'backend '..k..': '..err end
    end

    esh_query = esh[lc.opts.query_es_backend]
    if not esh_query then return nil, 'invalid query es backend' end

    if lc.opts.soc_es_backend then
        esh_soc = esh[lc.opts.soc_es_backend]
        if not esh_soc then return nil, 'invalid soc es backend' end
    end

    local pft, err = es.period_from_to({
        period = lc.opts.query_param.p_period,
        from = lc.opts.query_param.p_from,
        to = lc.opts.query_param.p_to },
        pft_defaults)
    if not pft then return nil, 'invalid period spec: '..err end
    for k,v in pairs(pft) do lc.opts.query_param['p_'..k] = v end
    lc.opts.query_param.p_from = nil
    lc.opts.query_param.p_to = nil

    for k,v in pairs(default_query_param) do
        if lc.opts.query_param[k] == nil then
            lc.opts.query_param[k] = v
        end
    end

    return true
end

function process_mode_query()
    local err
    -- prepare filters
    if query_def.filters then
        lc.opts.query_param.auto_filters = ''
        for _,f in ipairs(query_def.filters) do
            f.value, err = lu.expand(f.value, lc.opts.query_param, 'filter '..f.name)
            if err then return nil, err end
            if f.query ~= false then
                lc.opts.query_param.auto_filters = lc.opts.query_param.auto_filters..
                    (#lc.opts.query_param.auto_filters > 0 and ' AND ' or '')..
                    '('..f.value..')'
            end
        end
    end
    -- run query
    local data, err = esh_query:search(query_def.query, nil, lc.opts.query_param)
    if not data then return nil, 'request: '..err end
    -- format results
    local result = {
        entries = {},
        query_time = data.took and data.took/1000,
    }
    local buckets
    if query_def.buckets then
        buckets = lu.getpath(data, query_def.buckets, 'buckets', 'table')
        if not buckets then return nil, 'could not access response buckets' end
    else
        buckets = { data }
    end
    for _,b in ipairs(buckets) do
        local entry = {
            data = {},
            explain = {
                indice = esh_query.indice,
                filters = query_def.filters,
                date_field = lc.opts.query_param.p_date_field,
                key_field = lc.opts.query_param.p_key_field,
            },
        }
        for k,v in pairs(query_def.data) do
            local ret, err = lu.getpath(b, v, v)
            if err then lc.perr(err) end
            entry.data[k] = ret
        end
        table.insert(result.entries, entry)
    end
    return result
end

function process_mode_script()
    local pret, result, err = pcall(query_def.script)
    if not pret then err = result; result = nil; end
    if not result then return nil, 'script failed: '..err end
    return result
end

lc.init_opts() -- die unknown on failure

local success, err = init_check()
if not success then lc.die_unkn('Check init failed: '..err) end
lc.dump(esh, 'ES handles')

local result, err
if query_def.script then result, err = process_mode_script()
else result, err = process_mode_query() end
if not result then lc.die_unkn('Data fetch failed: '..err) end
lc.dump(result, 'Data fetch result')

-- build soc documents for outputs
local docs = {}
for _,entry in ipairs(result.entries) do
    entry.data.intensity = get_intensity(entry.data)
    local doc = {
        ['@timestamp'] = lc.opts.query_param.p_to_rfc3339,
        soc = {
            query = {
                name = lc.opts.name,
                type = lc.opts.query_type,
                from = lc.opts.query_param.p_from_rfc3339,
                to = lc.opts.query_param.p_to_rfc3339,
                period = lc.opts.query_param.p_period,
            },
            meta = lc.opts.soc_meta,
            data = entry.data,
            explain = entry.explain,
        }
    }
    if esh_soc then
        local t,d,n = esh_soc.indice:match('^(logs)-([^-]+)-([^-]+)$')
        doc.data_stream = t and { type = t, dataset = d, namespace = n }
    end
    table.insert(docs, lc.cjson.encode(doc))
end

-- soc documents outputs
local outputs = {
    specs = { --[[ ex: syslog => { fn, args [, /* added on failure */ err] } ]] },
    add = function(self, name, fn, ...)
        self.specs[name] = { fn = fn, args = {...} }
    end,
    fail = function(self, name, err)
        if not self.specs[name] then self.specs[name] = {} end
        self.specs[name].err = err
    end,
    length = 0,
    failed = 0,
    errmsg = '',
    run = function(self)
        for o,s in pairs(self.specs) do
            self.length = self.length + 1
            if not s.err then
                local ret, err = s.fn(docs, table.unpack(s.args))
                s.err = err
            end
            if s.err then
                self.failed = self.failed + 1
                self.errmsg = self.errmsg..(#self.errmsg > 0 and ', ' or '')..
                    "Output '"..o.."' failed: "..s.err
            end
        end
    end,
}

-- register stderr output
outputs:add('stderr', function (docs)
    for _,d in ipairs(docs) do
        io.stderr:write(d..'\n')
    end
end)

-- register syslog output
if lc.opts.soc_syslog_addr then
    local ret, err = lu.syslog.init('soc.query.'..lc.opts.name, nil,
        lc.opts.soc_syslog_addr, lc.opts.soc_syslog_port)
    if ret then
        outputs:add('syslog', function (docs)
            for _,d in ipairs(docs) do
                local _,err lu.syslog.notice(d)
                if not ret then return false, err end
            end
        end)
    else
        outputs:fail('syslog', err)
    end
end

-- register es output
if esh_soc then
    outputs:add('es', function (docs)
        local ret, err = esh_soc:bulk(docs)
        if not ret then return false, err end
    end)
end

outputs:run()

local perfdata = {
    {   name = 'docs_generated', value = #docs, uom = '', label = 'Docs generated' },
}
if #docs > 0 then table.insert(perfdata,
    {   name = 'outputs', value = outputs.length - outputs.failed, uom = '', label = 'Outputs',
        max = outputs.length, critical = '100%:' })
end
if result.query_time then table.insert(perfdata,
    {   name = 'query_time', value = result.query_time, uom = 's', label = 'Query time',
        warning = lc.opts.warning_query_time, critical = lc.opts.critical_query_time })
end

lc.exit_code = lp.compute_perfdata(perfdata)
lc.exit_message = '['..lc.opts.name..'] '..lp.format_output(perfdata)..
    (#outputs.errmsg > 0 and ' - '..outputs.errmsg or '')..
    '|'..lp.format_perfdata(perfdata, true)
