Установка filebeat на windows

image

Передо мной встала задача сбора логов с парка серверов на ОС Windows и ОС Linux. Для того чтобы решить её я воспользовался стэком OpenSearch. Во время настройки OpenSearch мне не хватало в открытых источниках наглядных примеров, а информация на официальных сайтах ElasticSearch и OpenSearch мне показалась обрывочной, слабо привязанной к реальным ситуациям. Поэтому я решил поделиться своим опытом и описать основные моменты установки и некоторые сценарии настройки и применения стэка OpenSearch, которые я применил в своей практике.

1. Введение

1.1. Коротко о том, что такое OpenSearch

OpenSearch – система полнотекстового поиска. Является форком Elasticsearch. Разработана компанией A9.com, дочерней компанией Amazon.com.

Стэк OpenSearch (OpenSearch + Logstash-oss-with-OpenSearch-output-plugin + OpenSearch–Dashboards) является бесплатным аналогом с отрытым исходным кодом стэку ELK (Elasticsearch + Logstash + Kibana). В совокупности с Beat’ами (Filebeat, Winlogbeat, и т.д.) образуют полный цикл управления логами: сбор, систематизация, поиск.

Можно найти и множество других применений этому стэку, но в этой статье речь пойдет главным образом именно о логах.

1.2. Коротко о форках Elasticsearch

Elasticsearch развивается под этим названием с 2010 года. С 2018 года, с версии 6.3 лицензия на ELK меняется, появляется платная и бесплатная версия. Позже Amazon создает форк Elasticsearch и называет его «Open Distro for Elasticsearch». А в 2021 году Amazon создает новый форк и называет его «OpenSearch». Немного позже Российская компания Arenadata становится официальным партнёром OpenSearch в России и выпускает отечественный форк под названием «Arenadata LogSearch (ADLS)» под платной лицензией.

1.3. Что и зачем будем настраивать

Стэк OpenSearch мы будем разворачивать из архивов, это особенно актуально в закрытых средах. Кроме того, мы не будем использовать Kubernetes и Docker, то есть будем производить установку в классической среде.

В качестве ОС под OpenSearch будем использовать Linux Ubuntu 20.04.
Логи будут транспортироваться с серверов, на которых они возникли на сервер OpenSearch в виде сообщений. Для того, чтобы гарантировать доставку сообщений, в качестве транспорта мы будем использовать сервис Apache Kafka.

Apache Kafka – это распределённый программный брокер сообщений, который гарантирует доставку сообщений.

Мы введем OpenSearch в кластер, но будет описан ввод только основной ноды.

Кластер OpenSearch позволяет решать сразу несколько задач. Во-первых, традиционно, кластер решает задачу масштабирования, распределения нагрузки и отказоустойчивости. Во-вторых, данные накопленные в OpenSearch, можно распределять по их значимости на «Горячие» («Hot»), «Теплые» («Warm») и «Холодные» («Cold»). А ноды кластера можно настроить так чтобы они принимали только «Горячие» данные, или «Теплые» данные, или только «Холодные». И если на ноду под «Холодные» данные выделять менее производительные мощности, то это позволит сэкономить ресурсы.

1.4. Настраиваемая схема

На серверы, с которых нужно собирать логи, мы установим Beat’ы. На серверы под управлением ОС Windows мы установим Filebeat и Winlogbeat. На серверы под управлением Linux мы установим только Filebeat. Beat’ы будут отправлять сообщения с логами в Kafk’у. Logstash будет брать эти сообщения из Kafka, обрабатывать их и отправлять в OpenSearch. Пользователь будет осуществлять просмотр и поиск по логам через OpenSearch-Dashboards. Схема взаимодействия представлена на Рис.1.

image

Рис.1

Договоримся, что машина Node OpenSearch будет иметь:
1. Имя «server-elk01»;
2. Ip адрес «10.0.0.70».
Машина с Kafka будет иметь:
1. Ip адрес «10.0.0.60».

2. Установка стэка OpenSearch

2.1. Подготовка Linux машины Node OpenSearch

Перед началом установки основных пакетов установим Java и Unzip, а также создадим пользователей для OpenSearch и Logstash.

1. Установка Java:

apt install openjdk-11-jdk

2. Установка unzip:

apt install unzip

3. Создание пользователя opensearch:

groupadd opensearch
useradd opensearch -g opensearch -M -s /bin/bash
passwd opensearch

4. Создание пользователя logstash:

groupadd logstash
useradd logstash -g logstash -M -s /bin/bash
passwd logstash

2.2. Установка OpenSearch (аналог ElasticSearch)

OpenSearch, как можно было заметить, является ядром всего стэка. OpenSearch содержит базу данных, в которой будут храниться логи. Кроме того, OpenSearch имеет API для обработки запросов как на ввод данных, так и на вывод. Так же OpenSearch индексирует поступившие в неё данные и осуществляет поиск по этим данным.

Приступаем к установке.

1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать OpenSearch версии 1.2.4 (https://artifacts.opensearch.org/releases/bundle/opensearch/1.2.4/opensearch-1.2.4-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.

2. Даем права на выполнение для архива:

chmod +x opensearch-1.2.4-linux-x64.tar.gz

3. Распаковываем архив:

tar -xf opensearch-1.2.4-linux-x64.tar.gz

4. Будем устанавливать OpenSearch в каталог «/opt/opensearch», поэтому создаем рабочий каталог для OpenSearch:

mkdir /opt/opensearch

5. Переносим распакованные данные в рабочий каталог:

mv ./opensearch-1.2.4/* /opt/opensearch

6. Удаляем каталог, оставшийся от распаковки:

rmdir ./opensearch-1.2.4

7. Делаем пользователя opensearch владельцем рабочего каталога OpenSearch:

chown -R opensearch:opensearch /opt/opensearch

8. Запускает установочный скрипт от имени пользователя opensearch:

sudo -u opensearch /opt/opensearch/opensearch-tar-install.sh

Дожидаемся сообщения «Node ‘server-elk01’ initialized» и нажимаем «Ctrl+C».

9. Создаем файл демона для работы OpenSearch:

nano /lib/systemd/system/opensearch.service

/lib/systemd/system/opensearch.service


[Unit]
Description=Opensearch
Documentation=https://opensearch.org/docs/latest
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
RuntimeDirectory=opensearch
PrivateTmp=true

Restart=on-failure
RestartSec=60s

WorkingDirectory=/opt/opensearch

User=opensearch
Group=opensearch

ExecStart=/opt/opensearch/bin/opensearch

StandardOutput=journal
StandardError=inherit

# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65535

# Specifies the maximum number of processes
LimitNPROC=4096

# Specifies the maximum size of virtual memory
LimitAS=infinity

# Specifies the maximum file size
LimitFSIZE=infinity

# Not use SWAP
LimitMEMLOCK=infinity

# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0

# Allow a slow startup before the systemd notifier module kicks in to extend the timeout
TimeoutStartSec=75

[Install]
WantedBy=multi-user.target

За основу файла демона я взял файл оригинального демона Elasticsearch и переработал его.

10. Делаем пользователя root владельцем файла демона:

chown -R root:root /lib/systemd/system/opensearch.service

11. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

12. Создаем каталог для логов opensearch:

mkdir /var/log/opensearch

13. Делаем пользователя opensearch владельцем каталога логов:

chown -R opensearch /var/log/opensearch

2.3. Настройки производительности

Для того чтобы получить высокую производительность недостаточно иметь хорошее железо, нужно еще и правильно настроить систему. Попробуем оптимизировать несколько параметров.

1. Настройки Java:
Производятся в файле «/opt/opensearch/config/jvm.options». Необходимо настроить два параметра: «Xmx» и «Xms».

Оба параметра рекомендуется установить в значение равное 50% от имеющейся физической памяти узла. Чем больше доступно памяти, тем лучше, но есть ограничения. Значения параметров «Xmx» и «Xms» не должны превышать значение параметра JVM «compressed object pointers» (по умолчанию равен 32 Gb). И, значения параметров «Xmx» и «Xms» не должны превышать значение параметра JVM «zero-based compressed oops» (по умолчанию равен 26 Gb).

У меня на машине «server-elk01» установлено 8 Gb оперативной памяти поэтому настройки получились такие:

nano /opt/opensearch/config/jvm.options

/opt/opensearch/config/jvm.options

2. Настройка виртуальной памяти:

nano /etc/sysctl.conf

/etc/sysctl.conf


...
# Добавить в конце файла
vm.max_map_count=262144

Для того, чтобы изменения вступили в силу без перезагрузки хоста можно выполнить следующую команду в консоли:

sysctl -w vm.max_map_count=262144

2.4. Первый запуск OpenSearch

На этом этапе уже все готово к тому, чтобы запустить OpenSearch. Глубокой настройкой займемся позже, а сейчас просто запустим OpenSearch.

1. Запустим настроенный нами демон OpenSearch:

systemctl start opensearch

2. Проверим статус запуска демона OpenSearch:

systemctl status opensearch

3. Настроим автозапуск демона OpenSearch:

systemctl enable opensearch

4. Проверим работоспособность демона OpenSearch:

curl -X GET https://localhost:9200 -u 'admin:admin' --insecure

Если вы устанавливаете OpenSearch в ознакомительных целях, то можете считать установку законченной и приступать к использованию OpenSearch, правда только в консольном режиме, Web-интерфейс (OpenSearch-Dashboards) мы еще не устанавливали.

2.5. Смена пароля учетной записи OpenSearch

По умолчанию в OpenSearch предустановлена админская учетная запись «admin» с паролем «admin», поэтому для повышения уровня безопасности сменим пароль этой учетной записи. Для этого выполним следующие шаги.

1. Остановим демон OpenSearch:

systemctl stop opensearch

2. Дадим права на выполнение скрипта получения хэша пароля:

chmod +x /opt/opensearch/plugins/opensearch-security/tools/hash.sh

3. Запускаем скрипт и вводим новый пароль «yN-3L(GMmAAw»:

/opt/opensearch/plugins/opensearch-security/tools/hash.sh

Запоминаем полученный хэш, например:

$2y$12$OCvWNlMu8VbOarfdXdcjPOnHarqktJIcTYjwoykXdaJJfjcCTmfXO

К слову, я указал реальный хэш к описанному паролю, в тестах можно их использовать.

4. Заменим хэш пароля пользователя admin:

nano /opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml

/opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml

...
admin:
  hash: "$2y$12$OCvWNlMu8VbOarfdXdcjPOnHarqktJIcTYjwoykXdaJJfjcCTmfXO"
...

5. Дадим права на выполнение скрипта для создания новых сертификатов:

chmod +x /opt/opensearch/plugins/opensearch-security/tools/securityadmin.sh

6. Перейдём в каталог со скриптом:

cd /opt/opensearch/plugins/opensearch-security/tools

7. Запустим демон OpenSearch:

systemctl start opensearch

8. Запускаем скрипт для создания новых сертификатов:

./securityadmin.sh -cd ../securityconfig/ -icl -nhnv \
   -cacert ../../../config/root-ca.pem \
   -cert ../../../config/kirk.pem \
   -key ../../../config/kirk-key.pem

9. Проверяем работоспособность OpenSearch с новым паролем:

curl -X GET https://localhost:9200 -u 'admin:yN-3L(GMmAAw' --insecure

Аналогичным образом можно сменить пароль и у других учетных записей, которые можно обнаружить в «/opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml», если вам это необходимо.

2.6. Переводим OpenSearch в режим кластера

Как я писал выше в реализуемой схеме мы введем OpenSearch в кластер, но введем только одну ноду. Потому как задачи перед кластером можно поставить разные то и настройки, и схемы могут сильно отличаться. Если вам потребуется только одна машина с OpenSearch, то можно и не вводить её в кластер, тогда этот шаг можно пропустить. А мы приступим.

1. Остановим демон OpenSearch:

systemctl stop opensearch

2. Добавляем параметры в настройки OpenSearch:

nano /opt/opensearch/config/opensearch.yml

/opt/opensearch/config/opensearch.yml

# ------------------------------------ Node ------------------------------------
# Имя ноды:
node.name: os-node01-server-elk01
# Роли узла:
node.roles: [ master, data ]
#
# ---------------------------------- Network -----------------------------------
# Адрес узла - принимать на любых адресах:
network.host: 0.0.0.0
# Порт:
http.port: 9200
#
# ---------------------------------- Cluster -----------------------------------
# Имя кластера:
cluster.name: os_cluster
# Список узлов в голосовании по выбору master узла:
cluster.initial_master_nodes: ["os-node01"]
#
# --------------------------------- Discovery ----------------------------------
# Список master узлов кластера:
discovery.seed_hosts: ["10.0.0.70"]
#
# ----------------------------------- Paths ------------------------------------
# Директория с данными:
path.data: /opt/opensearch/data
# Директория с логами:
path.logs: /var/log/opensearch
#
######## Start OpenSearch Security Demo Configuration ########
# WARNING: revise all the lines below before you go into production
plugins.security.ssl.transport.pemcert_filepath: esnode.pem
plugins.security.ssl.transport.pemkey_filepath: esnode-key.pem
plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem
plugins.security.ssl.transport.enforce_hostname_verification: false
plugins.security.ssl.http.enabled: true
plugins.security.ssl.http.pemcert_filepath: esnode.pem
plugins.security.ssl.http.pemkey_filepath: esnode-key.pem
plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem
plugins.security.allow_unsafe_democertificates: true
plugins.security.allow_default_init_securityindex: true
plugins.security.authcz.admin_dn:
  - CN=kirk,OU=client,O=client,L=test, C=de

plugins.security.authcz.type: internal_opensearch
plugins.security.enable_snapshot_restore_privileges: true
plugins.security.check_snapshot_restore_write_privileges: true
plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"]
plugins.security.system_indices.enabled: true
plugins.security.system_indices.indices: [".opendistro-alerting-config", ".opendistro-alerting-alert*", ".opendistro-anomaly-results*", ".opendistro-anomaly-detector*", ".opendistro-anomaly-checkpoints", ".opendistro-anomaly-detection-state", ".opendistro-reports-*", ".opendistro-notifications-*", ".opendistro-notebooks", ".opendistro-observability", ".opendistro-asynchronous-search-response*", ".opendistro-metadata-store"]
node.max_local_storage_node: 3
######## End OpenSearch Security Demo Configuration ########

Обращаем внимание на роль ноды (master, data). Роль «master» — означает, что нода управляет кластером. Роль «data» — означает, что нода содержит базу данных.

3. Запускаем демон OpenSearch:

systemctl start opensearch

4. Проверим работоспособность OpenSearch:

curl -X GET https://localhost:9200 -u 'admin:yN-3L(GMmAAw' --insecure

5. Проверим состояние кластера:

curl -X GET https://localhost:9200/_cluster/health?pretty -u 'admin:yN-3L(GMmAAw' --insecure

6. Выясним кто мастер в кластере:

curl -X GET https://localhost:9200/_cat/master?pretty -u 'admin:yN-3L(GMmAAw' --insecure

2.7. Установка OpenSearch-Dashboards (аналог Kibana)

OpenSearch-Dashboards является Web-интерфейсом для OpenSearch. Для тех кто собирается работать с OpenSearch из консоли этот шаг можно и пропустить. Так же вместо OpenSearch-Dashboards в качестве Web-интерфейса можно установить и другие утилиты, например, «Grafana».
Приступим к установке OpenSearch-Dashboards.

1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать OpenSearch-Dashboards версии 1.2.0 (https://artifacts.opensearch.org/releases/bundle/opensearch-dashboards/1.2.0/opensearch-dashboards-1.2.0-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.

2. Даем права на выполнение для архива:

chmod +x opensearch-dashboards-1.2.0-linux-x64.tar.gz

3. Распаковываем архив:

tar -xf opensearch-dashboards-1.2.0-linux-x64.tar.gz

4. Будем устанавливать OpenSearch-Dashboards в каталог «/opt/opensearch-dashboards», поэтому создаем рабочий каталог для OpenSearch-Dashboards:

mkdir /opt/opensearch-dashboards

5. Переносим распакованные данные в рабочий каталог:

mv ./opensearch-dashboards-1.2.0-linux-x64/* ./opensearch-dashboards-1.2.0-linux-x64/.* /opt/opensearch-dashboards/

6. Удаляем каталог, оставшийся от распаковки:

rmdir ./opensearch-dashboards-1.2.0-linux-x64

7. Делаем пользователя opensearch владельцем рабочего каталога OpenSearch-Dashboards:

chown -R opensearch:opensearch /opt/opensearch-dashboards

По умолчанию OpenSearch-Dashboards доступен по порту 5601. Можно изменить порт в настройках OpenSearch-Dashboards, однако тогда придется запускать сервис под учетной записью root.

Поэтому оставим порт по умолчанию и позже настроим переадресацию через NGINX.

Для информации, для смены порта по умолчанию необходимо внести следующие изменения:

/opt/opensearch-dashboards/config/opensearch_dashboards.yml

nano /opt/opensearch-dashboards/config/opensearch_dashboards.yml

...
# Добавить в конец файла
server.port: 80
server.host: 10.0.0.70

8. Создаем файл демона для работы OpenSearch-Dashboards:

nano /lib/systemd/system/opensearch_dashboards.service

/lib/systemd/system/opensearch_dashboards.service


[Unit]
Description=Opensearch_dashboards
Documentation=https://opensearch.org/docs/latest
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
RuntimeDirectory=opensearch_dashboards
PrivateTmp=true

WorkingDirectory=/opt/opensearch-dashboards

User=opensearch
Group=opensearch

ExecStart=/opt/opensearch-dashboards/bin/opensearch-dashboards

StandardOutput=journal
StandardError=inherit

# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65535

# Specifies the maximum number of processes
LimitNPROC=4096

# Specifies the maximum size of virtual memory
LimitAS=infinity

# Specifies the maximum file size
LimitFSIZE=infinity

# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0

# Allow a slow startup before the systemd notifier module kicks in to extend the timeout
TimeoutStartSec=75

[Install]
WantedBy=multi-user.target

9. Делаем пользователя root владельцем файла демона:

chown -R root:root /lib/systemd/system/opensearch_dashboards.service

10. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

11. Запустим настроенный нами демон OpenSearch-Dashboards:

systemctl start opensearch_dashboards

12. Проверим статус запуска демона OpenSearch-Dashboards:

systemctl status opensearch_dashboards

13. Настроим автозапуск демона OpenSearch-Dashboards:

systemctl enable opensearch_dashboards

2.8. Установка NGINX и настройка переадресации порта OpenSearch-Dashboards

OpenSearch-Dashboards работает на порту 5601, но это неудобно, так как портом для Web-ресурсов по умолчанию является порт 80. Мы установим NGINX и настроим его так, чтобы он слушал 80 порт и перенаправлял с него все запросы на порт 5601. То есть NGINX будет работать в режиме прокси. Приступаем.

1. Установим Nginx на хост:

apt install nginx

2. Настроим автозапуск Nginx:

systemctl enable nginx

3. Не будем удалять, а сделаем резервную копию настроек Nginx по умолчанию:

cp /etc/nginx/sites-enabled/default /etc/nginx/sites-enabled/default.sav

4. Настроим переадресацию OpenSearch-Dashboards с порта 5601 на 80:

nano /etc/nginx/sites-enabled/default

/etc/nginx/sites-enabled/default


map $http_upgrade $connection_upgrade { # WebSocket support
        default upgrade;
        '' '';
    }

server {
    listen 80;
    server_name server-elk01 server-elk01.domain.ad 10.0.0.70;

    location / {
         proxy_pass http://127.0.0.1:5601; # full internal address
         proxy_http_version  1.1;
         proxy_set_header    Host $server_name:$server_port;
         proxy_set_header    X-Forwarded-Host $http_host;    # necessary for proper absolute redirects and TeamCity CSRF check
         proxy_set_header    X-Forwarded-Proto $scheme;
         proxy_set_header    X-Forwarded-For $remote_addr;
         proxy_set_header    Upgrade $http_upgrade; # WebSocket support
         proxy_set_header    Connection $connection_upgrade; # WebSocket support
    }
}

5. Активируем новые настройки Nginx без перезагрузки демона:

nginx -s reload

В случае неудачи перезагрузим Nginx:

service nginx reload

2.9. Настройка файрволла на запрет доступа по порту 5601

После того как мы настроили на NGINX переадресацию с 80 порта на порт 5601 сам порт 5601 продолжит принимать запросы. На этом шаге мы закроем порт 5601 файерволлом. Это, конечно, не обязательно, но, я считаю, что так будет больше порядка.

Само правило для файрволла не сложное, а вариантов как сделать так, чтобы после перезагрузки системы правило активировалось вновь можно придумать много. Я пойду по пути создания демона, который будет запускать bash-скрипт «/etc/rc.local». А сам скрипт будет содержать команду добавления правила в файрволл. Приступим.

1. Правило привяжем к сетевому интерфейсу, поэтому сначала необходимо узнать название сетевого интерфейса, для этого воспользуемся командой:

ip link show

или командой:

ifconfig

В моей системе сетевой интерфейс имеет имя «ens160».

2. Создадим Bash-скрипт с правилом файрволла:

nano /etc/rc.local

/etc/rc.local


#!/bin/bash
iptables -A INPUT -i ens160 -p tcp --dport 5601 -j DROP

exit 0

Соответственно имя интерфейса нужно изменить на имя своего интерфейса.

3. Дадим скрипту права на выполнение:

chmod +x /etc/rc.local

4. Делаем пользователя root владельцем скрипта:

chown -R root:root /etc/rc.local

5. Создадим файл демона для запуска скрипта:

nano /etc/systemd/system/rc-local.service

/etc/systemd/system/rc-local.service


[Unit]
 Description=/etc/rc.local Compatibility
 ConditionPathExists=/etc/rc.local

[Service]
 Type=forking
 ExecStart=/etc/rc.local start
 TimeoutSec=0
 StandardOutput=tty
 RemainAfterExit=yes
 SysVStartPriority=99

[Install]
 WantedBy=multi-user.target

6. Зададим права на файл демона:

chmod 644 /etc/systemd/system/rc-local.service

7. Делаем пользователя root владельцем файла демона:

chown -R root:root /etc/systemd/system/rc-local.service

8. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

9. Запустим настроенный нами демон rc-local:

systemctl start rc-local

10. Проверим статус запуска демона rc-local:

systemctl status rc-local

11. Настроим автозапуск демона rc-local:

systemctl enable rc-local

2.10. Установка Logstash-oss-with-OpenSearch-output-plugin

Logstash в реализуемой схеме выполняет несколько задач. Во-первых, он как насос будет постоянно обращаться к Kafka и втягивать все новые сообщения, которые будут накапливаться в Kafka. Во-вторых, получаемые сообщения он будет преобразовывать правилами, которые мы в него позже заложим. И в-третьих, он будет раскладывать преобразованные сообщения по разным индексам в OpenSearch. Приступим к установке.

1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать Logstash-oss-with-OpenSearch-output-plugin версии 7.16.2 (https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.
2. Даем права на выполнение для архива:

chmod +x logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz

3. Распаковываем архив:

tar -xf logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz

4. Будем устанавливать Logstash-oss-with-OpenSearch-output-plugin в каталог «/opt/logstash», поэтому создаем рабочий каталог для Logstash-oss-with-OpenSearch-output-plugin:

mkdir /opt/logstash

5. Переносим распакованные данные в рабочий каталог:

mv ./logstash-7.16.2/* /opt/logstash/

6. Удаляем каталог, оставшийся от распаковки:

rmdir ./logstash-7.16.2

7. Делаем пользователя logstash владельцем рабочего каталога Logstash-oss-with-OpenSearch-output-plugin:

chown -R logstash:logstash /opt/logstash

8. Создаем файл демона для работы Logstash-oss-with-OpenSearch-output-plugin:

nano /etc/systemd/system/logstash.service

/etc/systemd/system/logstash.service


[Unit]
Description=logstash

[Service]
Type=simple
User=logstash
Group=logstash

ExecStart=/opt/logstash/bin/logstash "--path.settings" "/opt/logstash/config"
Restart=always
WorkingDirectory=/opt/logstash
Nice=19
LimitNOFILE=16384

# When stopping, how long to wait before giving up and sending SIGKILL?
# Keep in mind that SIGKILL on a process can cause data loss.
TimeoutStopSec=75

[Install]
WantedBy=multi-user.target

Отмечу, что в файле демона оригинального Logstash от Elastic отсутствует параметр «TimeoutStopSec=75». И у меня были случаи, когда при остановке демон Logstash зависал, и приходилось искать java-процесс и убивать его. Параметр «TimeoutStopSec=75» принудительно завершит выполнение демона, если через 75 секунд после команды «stop» процесс не завершится.

9. Зададим права на файл демона:

chmod 644 /etc/systemd/system/logstash.service

10. Делаем пользователя root владельцем файла демона:

chown -R root:root /etc/systemd/system/logstash.service

11. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

12. Зададим путь к базе данных и логам Logstash-oss-with-OpenSearch-output-plugin:

nano /opt/logstash/config/logstash.yml

/opt/logstash/config/logstash.yml

path.data: /opt/logstash/data
pipeline.ordered: auto
path.logs: /var/log/logstash

13. Зададим путь для чтения файлов pipelines:

nano /opt/logstash/config/pipelines.yml

/opt/logstash/config/pipelines.yml

- pipeline.id: man
  path.config: "/opt/logstash/config/conf.d/*.conf"

14. Создадим каталог для файлов pipelines:

mkdir /opt/logstash/config/conf.d

15. Создадим каталог для логов:

mkdir /var/log/logstash

16. Делаем пользователя logstash владельцем каталога логов:

chown -R logstash /var/log/logstash

17. На этом этапе следовало бы сконфигурировать файлы pipelines для обработки данных поступающих из kafka и уходящих в OpenSearch, но я опишу это позже в нескольких кейсах и предоставлю итоговые файлы конфигураций.
18. После окончания настройки еще раз делаем пользователя logstash владельцем всех файлов в рабочем каталоге Logstash-oss-with-OpenSearch-output-plugin:

chown -R logstash:logstash /opt/logstash

19. Запустим настроенный нами демон Logstash-oss-with-OpenSearch-output-plugin:

systemctl start logstash

20. Проверим статус запуска демона Logstash-oss-with-OpenSearch-output-plugin:

systemctl status logstash

21. Настроим автозапуск демона Logstash-oss-with-OpenSearch-output-plugin:

systemctl enable logstash

3. Установка Beat’ов

Beat’ы это своего рода агенты, которые устанавливаются на каждом сервер. Они следят за определенными объектами, в нашем случае за файлами с логами, и когда заметят, что в файле произошли изменения они (Beat’ы) сформируют сообщение, которое будет содержать эти изменения, и отправят их (сообщения), в нашем случае, в Kafka.

Сразу стоит отметить, что Elastic выпускает две версии beat’ов: (условно) «платные» (Filebeat, Winlogbeat и т.д.) и (условно) «бесплатные» (Filebeat-OSS-only, Winlogbeat-OSS-only и т.д.). На сколько я понял «платные» работают так же, как и «бесплатные», за исключением того, что в «платных» можно подключить платный функционал «X-Pack». То есть «платные» обладают расширенными возможностями, которые можно и не покупать, и при этом не обладают временным ограничением действия. Поэтому далее я буду использовать Filebeat и Winlogbeat, но без платного функционала «X-Pack».

3.1. Установка Filebeat на Linux

1. Переходим на официальный сайт Elastic в раздел Filebeat (https://www.elastic.co/downloads/beats/filebeat) и скачиваем архив deb нужной версии. В этой статье я буду использовать Filebeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-amd64.deb).
Я буду настраивать Filebeat на той же машине, где развернут OpenSearch, поэтому после скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.

2. Даем права на выполнение для архива:

chmod +x ./filebeat-7.12.1-amd64.deb

3. Установим Filebeat:

dpkg -i ./filebeat-7.12.1-amd64.deb

4. На этом этапе следовало бы сконфигурировать файл «/etc/filebeat/filebeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.

5. Подключаем модуль system в filebeat:

filebeat modules enable system

6. Подключаем модуль kafka в filebeat:

filebeat modules enable kafka

7. Проверим список подключенных модулей:

filebeat modules list

8. Запустим демон Filebeat:

systemctl start filebeat

9. Проверим статус запуска демона Filebeat:

systemctl status filebeat

10. Настроим автозапуск демона Filebeat:

systemctl enable filebeat

3.2. Установка Filebeat на Windows

1. Переходим на официальный сайт Elastic в раздел Filebeat (https://www.elastic.co/downloads/beats/filebeat) и скачиваем архив zip нужной версии. В этой статье я буду использовать Filebeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-windows-x86_64.zip).

2. Будем устанавливать Filebeat в каталог «C:\filebeat», поэтому после скачивания перенесите архив на сервер «server-windows01» и распакуйте его в каталог «C:\filebeat».

3. На этом этапе следовало бы сконфигурировать файл «C:\filebeat\filebeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.

4. Для создания службы нужно выполнить следующий скрипт с правами администратора:

powershell "C:\filebeat\install-service-filebeat.ps1"

5. Для запуска Filebeat без службы можно использовать команды:

cd C:\filebeat\
filebeat -e -c filebeat.yml

3.3. Установка Winlogbeat на Windows

1. Переходим на официальный сайт Elastic в раздел Winlogbeat (https://www.elastic.co/downloads/beats/winlogbeat) и скачиваем архив zip нужной версии. В этой статье я буду использовать Winlogbeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/winlogbeat/winlogbeat-7.12.1-windows-x86_64.zip).

2. Будем устанавливать Winlogbeat в каталог «C:\winlogbeat», поэтому после скачивания перенесите архив на сервер «server-windows01» и распакуйте его в каталог «C:\winlogbeat».

3. На этом этапе следовало бы сконфигурировать файл «C:\winlogbeat\winlogbeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.

4. Для создания службы нужно выполнить следующий скрипт с правами администратора:

powershell "C:\winlogbeat\install-service-winlogbeat.ps1"

5. Для запуска Winlogbeat без службы можно использовать команды:

cd C:\winlogbeat\
winlogbeat -e -c winlogbeat.yml

4. Настройка на примере нескольких кейсов

Рассмотрим несколько ситуаций, в разрезе разных форматов собираемых данных и в разрезе разных Beat’ов, которые будут собирать данные. Посмотрим, как выглядят данные, которые нам предстоит собрать. Какой Beat их будет собирать. Как следует настроить Beat в каждой ситуации. И как следует настроить Logstash в каждой ситуации.

4.1. Сценарий сбора данных в формате json

4.1.1. Исходные данные

Файл (файлы) логов содержит сообщения в виде списка json объектов. Формат самого файла текстовый (не json), то есть квадратные скобки в начале и в конце файла не нужны.
Файл дописывается новыми сообщениями по мере необходимости.

Пример:

«C:\Temp\test_log_json2\2022-03-15.log»

{
    "logId":  "ID_1",
    "uniqueSubId":  "001",
    "mess":  "test_text"
}

{
    "logId":  "ID_2", "uniqueSubId":  "002", "mess":  "test_text2"
}

{
    "logId":  "ID_3",
    "uniqueSubId":  "003",
    "mess":  "test_text3"
}

Зачем нужно добавлять пустые строки после каждого json-объекта я опишу далее в разделе «Актуальные проблемы».

4.1.2. Процесс сбора данных через Filebeat

Служба Filebeat будет периодически проверять файл на наличие изменений. При обнаружении изменений эти изменения будут отправлены в Topic Kafka. На выходе в начало сообщения добавим метку времени отправки данных с сервера. Во входящее сообщение добавим тэг «test-json», а на выходе будем отправлять в Topic Kafka только те сообщения, которые содержат этот тэг. Так мы промаркируем сообщения. Маркировка сообщений становится актуальна, когда входящих и исходящих потоков несколько. Я добавил маркировку сообщения в этом кейсе для полноты конфигурации.

Отдельного внимания заслуживает параметр «codec.format». Если его не указывать, то, по умолчанию, отправляемое сообщение будет в формате json и будет содержать целый набор полей, которые содержат в основном техническую (справочную) информацию (имя и версию ОС, имя и версию Beat’а и т.д.). Меня же интересует только основное сообщение, которое содержится в поле «message», остальные данные я буду считать объективно известными, за счет того, что все потоки данных разделены на отдельные Topic’и Kafka.

Полный список полей, отправляемых в сообщении с настройками по умолчанию: «event», «log», «message», «@timestamp», «@metadata», «ecs», «agent», «host», «service», «input», «fileset».

Пример конфигурации Filebeat:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - C:\Temp\test_log_json2\*
  multiline.pattern: '^{'
  multiline.negate: true 
  multiline.match: after
  processors:
  - decode_json_fields:
      fields: ["message"]
      target: "json"
  tags: ["test-json"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-test-json"
     when.contains:
       tags: "test-json"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

4.1.3. Процесс обработки данных в Logstash

Демон Logstash при обнаружении новых данных в Topic’е Kafka начинает их прием и обработку. Данные внутри Logstash поступают сначала в модуль input, потом поступают в модуль filter и затем поступают в модуль output.

В модуле input примем данные из Topic’а Kafka и добавим в сообщение поле «type», которое будет содержать название Topic’а.

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-test-json"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-test-json"
    }
}

В модуле filter обрабатываются только те сообщения, в которых есть поле «type», значение которого равно имени Topic’а Kafka. Из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_filebeat». Декодируем основное сообщение из формата json, при этом все заголовки ключей станут одноименными полями, а значения ключей перейдут в значения полей. Добавим поле «timestamp_logstash», для фиксирования времени, в которое проводилась обработка сообщения.

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
if [type] == "server-windows01-test-json" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    json {
      source => "message"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

В модуле output обрабатываются только те сообщения, в которых есть поле «type», значение которого равно имени Topic’а Kafka. Каждое сообщение отправляется в OpenSearch в индекс, соответствующий текущей дате, имени Topic’а и префикса «Kafka».

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-test-json" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-test-json-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.2. Сценарий сбора однострочных данных в текстовом формате

4.2.1. Исходные данные

Каждая строка файла будет извлекаться как новое сообщение и всё содержимое строки будет помещено в поле «message» как текст. В Filebeat такой сценарий работает по умолчанию. Нам остается только промаркировать сообщения и отправить их в нужный Topic Kafka.

Если содержание всех строк стандартизировано, то в Logstash на этапе Filter можно разложить сообщение на заранее известные поля. В этом примере мы этого делать не будем.

Пример:

«C:\Temp\log\2022-03-16.log»

[2022.03.16 13:01:09] Info | Just text
Next text
[2022.03.16 13:02:01] Info | New text

4.2.2. Процесс сбора данных через Filebeat

Процесс сбора данных аналогичен ситуации сбора данных в формате json за исключением того, что нам не нужно парсить json, так же изменим тэг для маркировки входящих сообщений, и изменим Topic Kafka.

Пример конфигурации Filebeat:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
- type: filestream
  paths:
    - C:\Temp\log\*.log
  tags: ["simple-logs"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-simple-logs"
     when.contains:
       tags: "simple-logs"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

4.2.3. Процесс обработки данных в Logstash

Процесс обработки данных в Logstash будет незначительно отличаться от сценария для ситуации с json-объектами, поэтому я приведу только примеры конфигураций.

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-simple-logs"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-simple-logs"
    }
}

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-simple-logs" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-simple-logs" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-simple-logs-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.3. Сценарий сбора Логов IIS

Для сбора логов IIS в Filebeat предусмотрен отдельный модуль, который так и называется IIS. В IIS логи делятся на две категории: «ACCESS» и «ERROR». У меня пути к этим логам настроены по умолчанию.

4.3.1. Процесс сбора данных через Filebeat

Для того, чтобы промаркировать эти две категории («ACCESS» и «ERROR») и направить их в разные Topic’и Kafka я использовал параметр «@metadata.pipeline».

Так же стоит обратить внимание на параметры «var.paths», по какой-то причине в путях необходимо использовать обратные слэши.

Пример конфигурации Filebeat:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
filebeat.modules:
  - module: iis
    access:
      enabled: true
      var.paths: ["C:/inetpub/logs/LogFiles/*/*.log"]
    error:
      enabled: true
      var.paths: ["C:/Windows/System32/LogFiles/HTTPERR/*.log"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "rgmtpaydox39-iis-access"
     when.contains:
       "@metadata.pipeline": "iis-access"
   - topic: "rgmtpaydox39-iis-error"
     when.contains:
       "@metadata.pipeline": "iis-error"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

4.3.2. Процесс обработки данных в Logstash

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-iis-access"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-access"
    }
    kafka {
      type => "server-windows01-iis-error"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-error"
    }
}

Во время обработки «iis-access» из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_filebeat». Затем из полученного сообщения снова вырежем метку времени из начала сообщения и добавим её в новое поле «timestamp_windows». Затем преобразуем тип поля «timestamp_windows» из строкового в тип «дата». Это позволит в дальнейшем в OpenSearch-Dashboards создавать шаблоны индексов с фильтрацией по этому полю. Затем добавим еще одно поле «timestamp_logstash».

За счет наличия трех меток времени мы сможем точно отследить, когда событие возникло в windows, когда попало в обработку Filebeat и когда попало в обработку Logstash.

Обработка «iis-error» осуществляется аналогичным образом.

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-iis-access" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-iis-error" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-iis-access" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-access-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-iis-error" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-error-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.4. Сценарий сбора системных Логов Windows

4.4.1. Процесс сбора данных через Winlogbeat

Системные логи Windows будем собирать с помощью Winlogbeat. Доступны 3 вида логов: «Application», «Security» и «System».

Winlogbeat имеет особенность в отличие от Filebeat. Подставляемый на выходе параметр «timestamp» будет содержать не время отправки с сервера, а время события (из лога), произошедшего в Windows. Временная метка отправки с сервера будет содержаться в поле «event» в параметре «created» («event.created»). Кроме того, я посчитал еще несколько полей информативными. В итоге на выходе отправляются следующие поля: «@timestamp», «message», «event», «log», «winlog». При этом «@timestamp» расположен в начале сообщения и отделен пробелом. Все остальные поля я разделил символом «|».

Полный список полей, отправляемых в сообщении с настройками по умолчанию: «event», «log», «message», «winlog», «@timestamp», «@metadata», «ecs», «agent», «host».

Так же как и в Filebeat вы можете убрать параметр «codec.format» и посмотреть полный список полей и их значений, которые попадут в Logstash в формате json.

К каждому виду логов можно добавить параметр «ignore_older», чтобы игнорировать слишком старые логи. Я добавил этот параметр к «Application» со значением «72h», чтобы игнорировать логи старше 72 часов.

Пример конфигурации Winlogbeat:

«C:\winlogbeat\winlogbeat.yml»

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
    tags: ["server-windows01-application"]

  - name: Security
    tags: ["server-windows01-security"]

  - name: System
    tags: ["server-windows01-system"]

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-application"
     when.contains:
       tags: "server-windows01-application"
   - topic: "server-windows01-security"
     when.contains:
       tags: "server-windows01-security"
   - topic: "server-windows01-system"
     when.contains:
       tags: "server-windows01-system"
 codec.format:
   string: '%{[@timestamp]} %{[message]}|%{[event]}|%{[log]}|%{[winlog]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000

logging.level: info
logging.to_files: true
logging.files:
  path: C:\winlogbeat
  name: winlogbeat.log
  keepfiles: 7

4.4.2. Процесс обработки данных в Logstash

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-application"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-application"
    }
    kafka {
      type => "server-windows01-security"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-security"
    }
    kafka {
      type => "server-windows01-system"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-system"
    }
}

Во время обработки «application» из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_windows». Установим, что остальные поля разделены символом «|», в соответствии с конфигурацией Winlogbeat. Символ «|» необходимо экранировать символом «\».
Названия полей обозначим с таким же именем и в таком же порядке, как и в конфигурации Winlogbeat. Зная, что поле «event» находится в формате json распарсим его как json и поместим в то же поле. С полями «log» и «winlog» поступим аналогично. Так как время обработки сообщения в Winlogbeat находится в поле «event» в параметре «created», то создадим поле «timestamp_winlogbeat» и поместим туда значение из «event.created». После этого создадим поле «timestamp_logstash» с меткой времени обработки сообщения в Logstash.

Сообщения «security» и «system» обрабатываются аналогичным образом.

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-application" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-security" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-system" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-application" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-application-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-security" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-security-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-system" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-system-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.5. Сценарий сбора системных Логов Linux

4.5.1. Процесс сбора данных через Filebeat

Системные логи Linux будем собирать с помощью Filebeat. Конфигурирование Filebeat в Linux практически идентично конфигурированию Filebeat в Windows. За счет того, что при установке Filebeat на Linux мы включили модуль system, можно настроить только вывод данных, сменив его с elasticsearch на kafka, остальные настройки можно оставить по умолчанию и данные пойдут в формате json.

Поскольку системные логи Linux хранятся в одном файле «/var/log/syslog» (архивные не берем в расчет) и формат логов однострочный, то есть каждое сообщение хранится в отдельной строке, то в этой ситуации можно так же применить «Сценарий сбора однострочных данных в текстовом формате». Именно этот вариант я и представлю.

Пример конфигурации Filebeat:

«/etc/filebeat/filebeat.yml»

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/syslog

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
 hosts: ["10.0.0.60:9092"]
 topic: "server-elk01-syslog"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat.log
  keepfiles: 7

4.5.2. Процесс обработки данных в Logstash

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-elk01-input.conf»

input {
    kafka {
      type => "server-elk01-syslog"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-elk01-syslog"
    }
}

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-elk01-filter.conf»

filter {
  if [type] == "server-elk01-syslog" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-elk01-output.conf»

output {
  if [type] == "server-elk01-syslog" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-elk01-syslog-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

5. Итоговые файлы конфигураций

С учетом всех сценариев у меня получились следующие файлы конфигураций.

Пример конфигурации Filebeat на Linux:

«/etc/filebeat/filebeat.yml»

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/syslog

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
 hosts: ["10.0.0.60:9092"]
 topic: "server-elk01-syslog"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat.log
  keepfiles: 7

Пример конфигурации Filebeat на Windows:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
- type: filestream
  paths:
    - C:\Temp\log\*.log
  tags: ["simple-logs"]

- type: log
  enabled: true
  paths:
    - C:\Temp\test_log_json2\*
  multiline.pattern: '^{'
  multiline.negate: true 
  multiline.match: after
  processors:
  - decode_json_fields:
      fields: ["message"]
      target: "json"
  tags: ["test-json"]

filebeat.modules:
  - module: iis
    access:
      enabled: true
      var.paths: ["C:/inetpub/logs/LogFiles/*/*.log"]
    error:
      enabled: true
      var.paths: ["C:/Windows/System32/LogFiles/HTTPERR/*.log"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-simple-logs"
     when.contains:
       tags: "simple-logs"
   - topic: "server-windows01-test-json"
     when.contains:
       tags: "test-json"
   - topic: "server-windows01-iis-access"
     when.contains:
       "@metadata.pipeline": "iis-access"
   - topic: "server-windows01-iis-error"
     when.contains:
       "@metadata.pipeline": "iis-error"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

Пример конфигурации Winlogbeat:

«C:\winlogbeat\winlogbeat.yml»

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
    tags: ["server-windows01-application"]

  - name: Security
    tags: ["server-windows01-security"]

  - name: System
    tags: ["server-windows01-system"]

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-application"
     when.contains:
       tags: "server-windows01-application"
   - topic: "server-windows01-security"
     when.contains:
       tags: "server-windows01-security"
   - topic: "server-windows01-system"
     when.contains:
       tags: "server-windows01-system"
 codec.format:
   string: '%{[@timestamp]} %{[message]}|%{[event]}|%{[log]}|%{[winlog]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000

logging.level: info
logging.to_files: true
logging.files:
  path: C:\winlogbeat
  name: winlogbeat.log
  keepfiles: 7

Пример конфигурации Logstash модуля input для сервера «server-elk01»:

«/opt/logstash/config/conf.d/server-elk01-input.conf»

input {
    kafka {
      type => "server-elk01-syslog"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-elk01-syslog"
    }
}

Пример конфигурации Logstash модуля filter для сервера «server-elk01»:

«/opt/logstash/config/conf.d/server-elk01-filter.conf»

filter {
  if [type] == "server-elk01-syslog" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output для сервера «server-elk01»:

«/opt/logstash/config/conf.d/server-elk01-output.conf»

output {
  if [type] == "server-elk01-syslog" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-elk01-syslog-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

Пример конфигурации Logstash модуля input для сервера «server-windows01»:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-simple-logs"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-simple-logs"
    }
    kafka {
      type => "server-windows01-test-json"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-test-json"
    }
    kafka {
      type => "server-windows01-application"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-application"
    }
    kafka {
      type => "server-windows01-security"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-security"
    }
    kafka {
      type => "server-windows01-system"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-system"
    }
    kafka {
      type => "server-windows01-iis-access"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-access"
    }
    kafka {
      type => "server-windows01-iis-error"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-error"
    }
}

Пример конфигурации Logstash модуля filter для сервера «server-windows01»:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-simple-logs" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-test-json" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    json {
      source => "message"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-application" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-security" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-system" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-iis-access" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-iis-error" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output для сервера «server-windows01»:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-simple-logs" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-simple-logs-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-test-json" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-test-json-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-application" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-application-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-security" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-security-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-system" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-system-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-iis-access" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-access-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-iis-error" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-error-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

6. Эксплуатация

Следующие шаги я разберу на примере сбора данных в формате json.

6.1. Создание шаблона индексов с помощью OpenSearch-Dashboards

Шаблон индексов позволяет просматривать сообщения одного или нескольких индексов сортируя их по дате. В качестве даты может выступать любое поле в формате «дата». Для того чтобы можно было осуществлять поиск с помощью OpenSearch-Dashboards необходимо создать шаблон индексов. Для этого перейдем в раздел «Stack Management», далее в раздел «Index Pattern» и нажмем кнопку «Create index pattern» (Рис.2).

image

Рис.2

Создадим шаблон индексов для данных из Topic’а Kafka без учета даты. Для этого введем в строке имени шаблона название индекса, но в конце, вместо даты введем символ «*» (Рис.3).

image

Рис.3

Параметром сортировки данных в шаблоне выберем «@timestamp», он совпадает с «timestamp_logstash» (Рис.4).

image

Рис.4

6.2. Отображение данных в OpenSearch-Dashboards

При переходе в раздел «Discover» в соответствующий шаблон индексов там отобразятся доступные сообщения. Можно раскрыть подробное содержание сообщения и увидеть, что все заголовки ключей json-сообщения стали одноименными полями, а значения ключей перешли в значения полей. Так же осталось поле «message», которое содержит изначальное json-сообщение (Рис.5). На этапе фильтрации сообщения в Logstash это поле («message») можно удалять, если в нем нет необходимости.

Если в json-сообщении содержится поле «message», то оригинальное поле «message», которое содержит изначальное json-сообщение, будет перезаписано на поле из json-сообщения.

image

Рис.5

6.3. Поиск в OpenSearch-Dashboards

Я рассмотрю совсем немного вариантов поиска, исключительно с целью обзора. Поиск будет только в формате «DQL» (формат по умолчанию).

Поиск осуществляется в разделе «Discover» в соответствующем шаблоне индексов.
1. Для поиска по всем доступным сообщениям во всех полях достаточно ввести нужную фразу в строке поиска (Рис.6).

image

Рис.6

2. Для поиска фраз по определенным полям в поисковой строке указывается имя нужного поля, затем, в качестве разделителя, идет символ двоеточия «:» и затем искомая фраза. Искомая фраза может содержать специальные символы «*», «?» и т.д. для формирования шаблона (Рис.7).

image

Рис.7

3. В поисковом запросе можно применять логические операции «or», «and», «not» (Рис.8).

image

Рис.8

6.4. Работа с OpenSearch через консоль (без OpenSearch-Dashboards)

Я приведу не большой список запросов для того, чтобы показать, как выглядит работа из консоли.
Запросить список индексов:

curl 'https://10.0.0.70:9200/_cat/indices?v&pretty' -u 'admin:yN-3L(GMmAAw' –insecure

Запросить структуру индекса:

curl 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_mapping?pretty'  -u 'admin:yN-3L(GMmAAw' --insecure

Удалить индекс:

curl -XDELETE 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30?pretty' -u 'admin:yN-3L(GMmAAw' –insecure

Вывести первые 10 сообщений из индекса (10 по умолчанию):

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Вывести первые 20 сообщений из индекса:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?size=20&pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Вывести первые 20 сообщений из шаблона индексов:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-*/_search?size=20&pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Осуществить поиск в индексе по запросу «*text*» через параметр «q»:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?q=*text*&pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Для поиска по составным запросам существуют несколько типов поиска:

term — точное совпадение искомой строки со строкой в индексе или термом;
match — все слова должны входить в строку, в любом порядке;
match_phrase — вся фраза должна входить в строку;
query_string — все слова входят в строку в любом порядке, можно искать по нескольким полям, используя регулярные выражения.

Осуществим поиск в индексе по полю «mess», искать будем точное совпадение фразы «test_text»:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?pretty ' -u 'admin:yN-3L(GMmAAw' --insecure -H 'Content-Type: application/json' -d '{"query" : {"term" : {"mess" : "test_text"}}}'

7. Актуальные проблемы

В этом разделе я опишу проблемы, с которыми я встретился.

7.1. Имена Topic’ов Kafka не должны содержать большие буквы

Logstash не может читать Topic’и Kafka, если они (Topic’и) содержат большие буквы. Поэтому имена Topic’ов Kafka не должны содержать большие буквы.

7.2. Filebeat некорректно создает Topic’и Kafka

При старте как Filebeat, так и Logstash создают Topic в Kafka, если в конфигурации соответствующего сервиса Topic имеется, а в Kafka он еще отсутствует. Но если Topic создает Filebeat, то Logstash не может читать этот топик. Если же Topic создает Logstash, то все сервисы могут читать этот Topic. Поэтому при создании/изменении конфигурации обеих служб сначала необходимо запустить Logstash и дождаться пока Topic в Kafka будет создан. И только после этого можно запускать Filebeat.

7.3. Требуется переход на следующую строку после json сообщения

При считывании сообщений из логов в формате json, после закрывающей фигурной скобки «}», которая закрывает json, в том случае, когда это последнее сообщение в файле, необходимо добавить переход на следующую строку. Иначе эта фигурная скобка «}» не попадет в обработку json и последнее json-сообщение в файле обрабатывается не корректно (не читается как json).

7.4. Необходимо обращать внимание на кодировку log-файлов

По умолчанию корректно обрабатываются log-файлы только в формате «UTF-8». Поэтому необходимо либо создавать log-файл в формате «UTF-8», либо явно указываться кодировку в Logstash и/или в Filebeat.

Например, я столкнулся с тем, что PowerShell по умолчанию формирует файлы в формате «UTF-16». Пришлось сменить кодировку по умолчанию в скрипте на «UTF-8»:

$PSDefaultParameterValues['*:Encoding'] = 'utf8'

8. Послесловие

Что мне хотелось бы добавить в эту статью, но по какой-то причине этого не произошло:

1. Настройка времени жизни индексов в OpenSearch;
2. Настройка Apache Kafka;
3. Настройка аутентификации Active Directory в OpenSearch-Dashboards по протоколу LDAP;
4. Создание визуализаций в OpenSearch-Dashboards;
5. Создание Dashboard’ов в OpenSearch-Dashboards.

На этом я заканчиваю. Всем спасибо за внимание к статье, надеюсь она окажется для кого-то полезной. И оставляйте ваши рецепты настройки OpenSearch в комментариях, думаю это так же будет интересно.

So What Exactly Is Filebeat?

Imagine you’ve got a huge pile of logs scattered all over your servers. You need to collect them, sort them out, and send them to a central place where you can analyze them. That’s basically what Filebeat does. It’s a lightweight shipper for log data, part of the Elastic Stack (ELK Stack: Elasticsearch, Logstash, Kibana). Filebeat helps you gather logs from your servers and forward them to Elasticsearch or Logstash. Pretty handy, right?

So you’re probably wondering, what’s the big deal with Filebeat? Well, it’s efficient and light on resources, making it perfect for keeping an eye on your logs without slowing down your systems.

In this article, we’ll walk through how to install and configure Filebeat. By the end, you’ll be able to set it up and start shipping those logs like a pro.

First things first, there’s a bit of setup involved. But don’t worry, it’s not as complicated as it sounds. Anyway, let’s dive right in…

Getting Started with Filebeat

To get going with Filebeat, you need to install it on your system. Filebeat can be installed on various operating systems, including Linux, Windows, and macOS. We’ll cover the basics for each.

Installing on Linux

If you’re on a Linux system, installing Filebeat is pretty straightforward. Here’s how you can do it:

  1. First, download the Filebeat package from the Elastic website. You can use a command like this:
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.10.0-amd64.deb
  1. Once downloaded, install the package using the following command:
sudo dpkg -i filebeat-7.10.0-amd64.deb

That’s it! Filebeat should now be installed on your Linux system.

Installing on Windows

For Windows users, the process is a bit different but just as easy. Follow these steps:

  1. Download the Filebeat Windows zip file from the Elastic website.
  2. Extract the contents of the zip file to a directory of your choice.
  3. Open a Command Prompt with administrative privileges and navigate to the directory where you extracted Filebeat.
  4. Run the following command to install Filebeat as a Windows service:
powershell.exe -ExecutionPolicy UnRestricted -File .\install-service-filebeat.ps1

Filebeat should now be installed as a service on your Windows system.

Installing on macOS

If you’re on macOS, you can use Homebrew to install Filebeat. Here’s how:

  1. First, make sure you have Homebrew installed. If not, you can install it by following the instructions on the Homebrew website.
  2. Once Homebrew is installed, you can install Filebeat with the following command:
brew tap elastic/tap
brew install elastic/tap/filebeat-full

And that’s it! Filebeat should now be installed on your macOS system.

Configuring Filebeat

Alright, so you’ve got Filebeat installed. Now it’s time to configure it. The configuration file for Filebeat is usually located in the installation directory. For Linux and macOS, it’s typically found at /etc/filebeat/filebeat.yml. For Windows, it’s in the directory where you extracted Filebeat.

The configuration file is where you’ll specify what logs to collect and where to send them. Here’s a basic example of what the configuration file might look like:

filebeat.inputs:
  • type: log
enabled: true paths: - /var/log/.log output.elasticsearch: hosts: ["localhost:9200"]


This configuration tells Filebeat to collect all log files in the /var/log/ directory and send them to a local Elasticsearch instance running on port 9200.

But wait, there’s more. You can also configure Filebeat to send logs to Logstash. Here’s how you can do that:

output.logstash:
 hosts: ["localhost:5044"]

This tells Filebeat to send the logs to a Logstash instance running on port 5044.

Advanced Configuration Options

Filebeat has a bunch of advanced configuration options that you can use to fine-tune its behavior. For example, you can specify how often Filebeat should check for new log files, or how much data it should send at a time.

Here are a few advanced options you might find useful:

  • scan_frequency: This option specifies how often Filebeat should scan for new log files. The default is 10 seconds.
  • harvester_buffer_size: This option specifies the size of the buffer that Filebeat uses to store log data before sending it. The default is 16384 bytes.
  • close_inactive: This option specifies how long Filebeat should wait before closing a file that hasn’t been updated. The default is 5 minutes.

You can add these options to your configuration file like this:

filebeat.inputs:
  • type: log
enabled: true paths: - /var/log/
.log scan_frequency: 5s harvester_buffer_size: 32768 close_inactive: 10m output.elasticsearch: hosts: ["localhost:9200"]

These options can help you optimize Filebeat’s performance for your specific use case. But anyway, let’s move on…

Testing Your Filebeat Configuration

So you’ve configured Filebeat, but how do you know if it’s working correctly? The best way to test your configuration is to run Filebeat in the foreground and check the output.

On Linux and macOS, you can do this with the following command:

sudo filebeat -e -d "" -c /etc/filebeat/filebeat.yml


On Windows, you can use this command:


filebeat.exe -e -d "" -c .\filebeat.yml

The -e flag tells Filebeat to run in the foreground, and the -d "*" flag enables debug logging. This will give you a lot of information about what Filebeat is doing, which can be helpful for troubleshooting.

If everything is working correctly, you should see log messages indicating that Filebeat is successfully collecting and sending log data. If you encounter any errors, the debug output should help you figure out what’s going wrong.

Common Issues and Troubleshooting

Even with the best intentions, things can go wrong. Here are some common issues you might encounter when setting up Filebeat, and how to troubleshoot them.

Filebeat Isn’t Collecting Logs

If Filebeat isn’t collecting logs, there are a few things you can check:

  • Make sure the paths specified in the configuration file are correct.
  • Check the permissions on the log files and directories. Filebeat needs read access to the log files.
  • Ensure that the scan_frequency option is set to a reasonable value. If it’s set too high, Filebeat might not be scanning for new logs often enough.

Filebeat Isn’t Sending Logs

If Filebeat is collecting logs but not sending them, there are a few things you can check:

  • Make sure the output configuration is correct. Double-check the host and port settings.
  • Check the network connection between Filebeat and the output destination (Elasticsearch or Logstash).
  • Ensure that the output destination is running and accepting connections.

If you’re still having trouble, the debug output can be a big help. Look for any error messages or warnings that might give you a clue about what’s going wrong.

Real-World Use Cases

Alright, so you’ve got Filebeat up and running. But what can you actually do with it? Here are a few real-world use cases to give you some ideas.

Monitoring Web Server Logs

One of the most common use cases for Filebeat is monitoring web server logs. By collecting and analyzing web server logs, you can gain insights into how your website is being used, identify performance bottlenecks, and detect security threats.

For example, you could configure Filebeat to collect Apache or Nginx logs and send them to Elasticsearch. Then, you could use Kibana to visualize the data and create dashboards that show key metrics like request volume, response times, and error rates.

Centralizing Logs from Multiple Servers

If you have multiple servers generating logs, it can be a challenge to keep track of them all. Filebeat can help by centralizing logs from multiple servers in a single location.

For example, you could install Filebeat on all your servers and configure it to send logs to a central Elasticsearch cluster. This would allow you to search and analyze logs from all your servers in one place, making it easier to diagnose issues and gain insights into your infrastructure.

Security Monitoring

Filebeat can also be used for security monitoring. By collecting and analyzing security-related logs, you can detect and respond to security threats in real-time.

For example, you could configure Filebeat to collect logs from your firewall, intrusion detection system, and other security tools. Then, you could use Elasticsearch and Kibana to analyze the data and create alerts for suspicious activity.

Wrapping Up

So that’s it! You now know how to install and configure Filebeat to collect and send log data. Filebeat is a powerful tool that can help you gain insights into your logs and improve your monitoring and security posture.

Remember, the key to success with Filebeat is to start small and gradually expand your configuration as you become more comfortable with the tool. And don’t forget to keep an eye on the debug output – it can be a lifesaver when things go wrong.

Happy logging!

FAQ

What is Filebeat used for?
Filebeat is used for collecting and forwarding log data from servers to a central location like Elasticsearch or Logstash.
Can Filebeat be installed on Windows?
Yes, Filebeat can be installed on Windows by downloading the Windows zip file and following the installation steps.
How do I configure Filebeat to send logs to Elasticsearch?
To configure Filebeat to send logs to Elasticsearch, you need to edit the filebeat.yml configuration file and specify the Elasticsearch host and port under the output.elasticsearch section.
What should I do if Filebeat isn’t collecting logs?
If Filebeat isn’t collecting logs, check the paths specified in the configuration file, ensure Filebeat has read access to the log files, and verify the scan_frequency setting. Also, look at the debug output for any error messages.
Can Filebeat be used for security monitoring?
Yes, Filebeat can be used for security monitoring by collecting and analyzing security-related logs. You can configure Filebeat to collect logs from firewalls, intrusion detection systems, and other security tools, and then use Elasticsearch and Kibana to analyze the data and create alerts for suspicious activity. Personally, I find this really useful for keeping an eye on potential threats.

Learn how to setup Filebeat on Windows Elasticsearch Command line. Our Windows Support team is here to help you with your questions and concerns.

How to Setup Filebeat on Windows Elasticsearch Command line

Filebeat can be described as a lightweight and open-source log shipper. It is provided by Elastic and is to forward log data from different sources to Elasticsearch or other log management systems like Logstash or Kafka.

Elastic Stack is made up of a group of products that access data from any source in any format. Furthermore, it helps search, analyze, as well as visualize it in real time. Elastic Shack is made up of these three main components and the beat family:

  • Elastic Search: Search and analytics engine
  • Kibana: Dashboard to visualize data
  • Logstash: Data processing pipeline

Elasticsearch is a distributed, RESTful search and analytics engine. It deals with a large number of use cases.

In fact, Elasticsearch plays a key role in the heart of the Elastic Stack. It stores user data centrally for search, relevancy, and analytics that is highly scalable.

If you are looking for a guide to help you set up Filebeat on Windows and connect it to Elastic Stack, you have come to the right place. Here is a quick overview of what we will be taking a look at:

  1. Filebeat Installation
  2. Connecting to the Elastic Stack
  3. Log Data Collection
    1. Enabling and Configuring Data Collection Modules
    2. Enabling and Configuring ECS Loggers for Application Logs
    3. Manual Configuration with Filebeat
  4. Setting Up Assets
  5. Starting Filebeat
  6. Viewing Data
  7. Troubleshooting Tips

1. Filebeat Installation

The first step is to install Filebeat on all the servers we want to monitor.

  1. First, download the Filebeat Windows zip file from this link.
  2. Then, it is time to extract the contents of the file into C:\Program Files.
  3. Next, rename the filebeat-version-windows directory to Filebeat.
  4. We have to open a PowerShell prompt as an Administrator and run these commands:
    PS > cd 'C:\Program Files\Filebeat'
    PS C:\Program Files\Filebeat> .\install-service-filebeat.ps1

    However, if script execution is disabled on our system, we have to set the execution policy for the current session so that the script can run.

    For example:

    PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-filebeat.ps1.

2. Connecting to the Elastic Stack

Now, we have to configure connections to Elasticsearch and Kibana to set up Filebeat. This involves setting the connection information in filebeat.yml.

  1. To begin with, locate the configuration file in the Directory layout.
  2. Next, mention the cloud.id of our Elasticsearch Service, and then set cloud.auth to a user authorized to set up Filebeat.

    For instance:

    cloud.id: "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
    cloud.auth: "filebeat_setup:PASSWORD"

    We can see a hard-coded password above. Our experts recommend storing sensitive values in the secrets keystore.

3. Log Data Collection

We can collect log data with Filebeat via one of these methods:

  • Data collection modules

    Filebeat comes with predefined data collection modules that simplify the process of collecting, parsing, and visualizing log data from common sources and formats. These modules are designed to handle specific log types, making it easier for users to set up log collection without extensive configuration.

  • ECS loggers

    ECS is a common data model for logs that provides a consistent and normalized structure for log entries. Filebeat can be configured to structure and format application logs into ECS-compatible JSON. This makes sure that log entries adhere to a standardized schema, making it easier to analyze and correlate data across different applications.

  • Manual Filebeat configuration

    In cases where predefined modules or ECS loggers may not be suitable, we can manually configure Filebeat. This involves specifying input sources, defining how the data should be processed, and specifying the output destination.

3.1 Enabling and Configuring Data Collection Modules

We can easily enable and configure data collection modules as seen below:

  1. First, identify the modules we want to enable. We can get the list of available modules by running:
    DEB RPM MacOS Brew Linux Windows
    PS > .\filebeat.exe modules list
  2. Now, we can enable one or more modules from the installation directory.

    For example, this command enables the nginx module config:

    DEB RPM MacOS Brew Linux Windows
    PS > .\filebeat.exe modules enable nginx

    In the module config under modules.d, we have to change the module settings to match our environment.

    For instance, log locations are set based on the OS. If our logs aren’t in the default locations, we have to set the paths variable as seen here:

    - module: nginx
    access:
    var.paths: ["/var/log/nginx/access.log*"]

3.2 Enabling and Configuring ECS Loggers for Application Logs

Although Filebeat can accept raw, plain-text application logs, our experts recommend structuring our logs. This lets us easily extract fields.

Furthermore, Elastic simplifies this process by offering application log formatters. These plugins format our logs into ECS-compatible JSON. This helps us avoid manually parsing logs.

3.3 Manual Configuration with Filebeat

If we cannot find a module for our file type, or change our application’s log output, we can enable and configure Filebeat manually.

To do this, we have to specify a list of inputs in the filebeat.inputs section of the filebeat.yml. These inputs mention how Filebeat will locate and process input data.

Additionally, the list is a YAML array, hence each input begins with a dash (-) symbol. We can also specify multiple inputs, and specify the same input type more than once.

Here is an example:

Furthermore, each filestream input needs a unique ID for tracking the state of files.

For the most basic configuration, define a single input with a single path. For example:

Here, the input harvests all the files in the path /var/log/*.log. In other words, Filebeat harvests all files in the directory /var/log/ that ends with .log.

Furthermore, we can fetch all files from a predefined level of subdirectories with this pattern: /var/log/*/*.log.

4. Setting Up Assets

Filebeat comes with predefined assets. They help parse, index, and visualize our data. We can load these assets as seen here:

  1. First, ensure the user mentioned in filebeat.yml has permission to set up Filebeat.
  2. Then, run this command from the installation directory:
    DEB RPM MacOS Brew Linux Windows
    PS > .\filebeat.exe setup -e

    Here, -e is optional. It sends output to standard error rather than configured log output.

As seen here, these steps load the recommended index template for writing to Elasticsearch. Additionally, it deploys the sample dashboards to help visualize the data in Kibana.

However, we do not load the ingest pipelines used to parse log lines here. By default, ingest pipelines are set up automatically when we run the module for the first time and connect to Elasticsearch.

5. Starting Filebeat

Before we start Filebeat, we have to change the user credentials in filebeat.yml and mention a user who has permission to publish events.

Then, run these commands to start Filebeat:

DEB RPM MacOS Brew Linux Windows
PS C:\Program Files\filebeat> Start-Service filebeat

Our experts would like to point out that the files are stored at C:\ProgramData\filebeat\Logs by default.

Now, Filebeat will start streaming events to Elasticsearch.

6. Viewing Data

  1. First, log in to our Elastic Cloud account and head to the Kibana endpoint in our deployment.
  2. Then, click Discover in the side navigation.
  3. We can see Filebeat data by making sure the predefined filebeat-* index pattern is selected.

    In case we do not see the data in Kibana, modify the time filter to a larger range. By default, Kibana displays the last fifteen minutes.

  4. Then, click Dashboard and choose the dashboard we want to open.

7. Troubleshooting Tips

If we run into trouble starting filebeat, we can use these commands below to troubleshoot.

First, let’s make sure the configuration file is syntactically correct. We can do this by running filebeat directly inside the terminal.

In case the file is invalid, filebeat will display an error loading config file error message. This message will contain further information about the error and how to fix it.

  • For deb/rpm
    sudo filebeat -e -c /etc/filebeat/filebeat.yml
  • For macOS
    cd EXTRACTED_ARCHIVE
    sudo ./filebeat -e -c filebeat.yml
  • For Windows
    cd EXTRACTED_ARCHIVE
    .\filebeat.exe -e -c filebeat.yml

At the end of the day, collecting Elasticsearch log data is simplified by setting up Filebeat. Let us know in the comments if you run into any trouble during the setup process.

[Need assistance with a different issue? Our team is available 24/7.]

Conclusion

In brief, our Support Experts demonstrated how to set up Filebeat on Windows Elasticsearch Command line.

PREVENT YOUR SERVER FROM CRASHING!

Never again lose customers to poor server speed! Let us help you.

Our server experts will monitor & maintain your server 24/7 so that it remains lightning fast and secure.

GET STARTED

Filebeat – это компонент Elastick Stack, который позволяет отслеживать добавление новых строк в файлы и отправлять их в Logstash или Elasticsearch. Это очень удобно для файлов журналов. Я решил вынести процесс в отдельную публикацию, чтобы сконцентрироваться на том, как выполняется Filebeat в Windows окружении. Это позволит в последующем, например, при подготовке публикации с примера использования Filebeat, не загромождать публикацию информацией об установке.

В последующих публикациях я постараюсь привести примеры использования Filebeat в Windows окружении.

Установка выполняется следующим образом:

1. Загрузите deb/rpm пакет со страницы загрузки:

2. Распакуйте директорию из архива в “C:\Program Files” и переименуйте директорию в “Filebeat”.

3. Запустите консоль PowerShell от имени администратора.

4. Запустите скрипт по установке Winlogbeat:

cd "C:\Program Files\Filebeat"
.\install-service-filebeat.ps1

5. Запустите сервис:

Start-Service filebeat

УУстановка Filebeat в Windows завершена.

Better Stack Team

Updated on November 18, 2024

Running Filebeat on Windows is straightforward. Filebeat is available as a native Windows service, and you can follow these steps to install and configure it:

1. Download Filebeat

  1. Go to the Elastic Downloads page.
  2. Download the Windows .zip file for Filebeat.

2. Install Filebeat

  1. Extract the Zip File:
    • Extract the contents of the .zip file to a directory of your choice, such as C:\\Program Files\\Filebeat.
  2. Configure Filebeat:

    • Navigate to the extracted directory.
    • Open the filebeat.yml configuration file in a text editor.

    Example configuration for Filebeat:

    filebeat.inputs:
      - type: log
        paths:
          - C:\\path\\to\\your\\log\\files\\*.log
    
    output.elasticsearch:
      hosts: ["<http://localhost:9200>"]
    

    Modify the paths to point to the location of your log files and hosts to your Elasticsearch instance.

3. Install Filebeat as a Windows Service

  1. Open a Command Prompt with Administrator privileges.
  2. Navigate to the Filebeat installation directory.
  3. Run the following command to install Filebeat as a service:

4. Start the Filebeat Service

  1. Still in the Command Prompt with Administrator privileges, start the Filebeat service:

    You can also use the Services management console (services.msc) to start and stop the Filebeat service.

5. Verify Installation

  1. Check the status of Filebeat to ensure it’s running correctly:

  2. Monitor the logs for any errors or issues. Log files are typically located in C:\\Program Files\\Filebeat\\logs.

6. Optional: Configure Logging and Debugging

  1. To enable debug logging, modify the filebeat.yml configuration file:

  2. Restart the Filebeat service to apply the changes:

7. Troubleshooting

  • Check Filebeat Logs: Review the Filebeat logs for any errors or warnings. Logs can be found in the logs directory of the Filebeat installation.
  • Firewall and Network Issues: Ensure that there are no firewall rules blocking Filebeat from sending data to Elasticsearch or Logstash.

By following these steps, you should have Filebeat up and running on your Windows system, collecting and forwarding logs as configured.

Got an article suggestion?
Let us know

Explore more

This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.

Make your mark

Join the writer’s program

Are you a developer and love writing and sharing your knowledge with the world? Join our guest
writing program and get paid for writing amazing technical guides. We’ll get them to the right
readers that will appreciate them.

Write for us

Writer of the month

Marin Bezhanov

Marin is a software engineer and architect with a broad range of experience working…

Понравилась статья? Поделить с друзьями:
0 0 голоса
Рейтинг статьи
Подписаться
Уведомить о
guest

0 комментариев
Старые
Новые Популярные
Межтекстовые Отзывы
Посмотреть все комментарии
  • На windows 10 вылетает wifi на ноутбуке
  • Windows 10 pro workstation что это
  • Windows server таблица версий
  • Microsoft windows search filter hosts
  • Windows 10 format indir