Logstash概述
Logstash是一个开源的数据收集引擎,具有实时管道处理能力。它可以从各种数据源收集数据,对数据进行转换和丰富,然后将数据发送到目标存储系统。Logstash是ELK技术栈中负责数据采集和处理的核心组件。
Logstash的核心功能
# Logstash核心功能
logstash_features = {
"数据收集": {
"功能": "从各种数据源收集数据",
"支持": "文件、数据库、消息队列、API等"
},
"数据处理": {
"功能": "解析、转换、丰富数据",
"支持": "Grok解析、JSON解析、字段转换等"
},
"数据输出": {
"功能": "将处理后的数据发送到目标系统",
"支持": "Elasticsearch、文件、数据库、消息队列等"
}
}
Logstash架构
┌─────────────┐
│ Input │ ◄─── 数据输入
└──────┬──────┘
│
▼
┌─────────────┐
│ Filter │ ◄─── 数据处理
└──────┬──────┘
│
▼
┌─────────────┐
│ Output │ ◄─── 数据输出
└─────────────┘
安装部署
1. 系统要求
# 系统要求检查
#!/bin/bash
echo "=== Logstash系统要求检查 ==="
# 检查Java版本(需要Java 11或更高版本)
java -version
# 检查内存(建议至少2GB)
free -h
# 检查磁盘空间(建议至少10GB)
df -h
2. Linux安装
# Ubuntu/Debian安装
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update && sudo apt-get install logstash
# CentOS/RHEL安装
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
sudo yum install logstash
# 使用tar包安装
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.11.0-linux-x86_64.tar.gz
tar -xzf logstash-8.11.0-linux-x86_64.tar.gz
cd logstash-8.11.0/
3. 基础配置
# config/logstash.yml
# 节点配置
node.name: logstash-node-1
path.data: /var/lib/logstash
path.logs: /var/log/logstash
# 管道配置
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50
# 队列配置
queue.type: memory
queue.max_bytes: 1gb
# 监控配置
monitoring.enabled: true
monitoring.elasticsearch.hosts: ["http://localhost:9200"]
4. 启动和验证
# 启动Logstash
# 方式1:使用systemd(推荐)
sudo systemctl daemon-reload
sudo systemctl enable logstash
sudo systemctl start logstash
# 方式2:直接启动
bin/logstash -f config/logstash.conf
# 方式3:测试配置
bin/logstash -f config/logstash.conf --config.test_and_exit
# 方式4:交互式测试
bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
基础配置
1. 简单配置示例
# config/simple.conf
input {
stdin {}
}
filter {
# 可以添加过滤器
}
output {
stdout {
codec => rubydebug
}
}
2. 文件输入配置
# config/file-input.conf
input {
file {
path => "/var/log/app.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
codec => "plain"
type => "app_log"
tags => ["application", "log"]
}
}
filter {
# 数据处理
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
3. 多输入源配置
# config/multi-input.conf
input {
# 文件输入
file {
path => "/var/log/app.log"
type => "app_log"
}
# TCP输入
tcp {
port => 5000
type => "tcp_log"
}
# HTTP输入
http {
port => 8080
type => "http_log"
}
}
output {
if [type] == "app_log" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
} else if [type] == "tcp_log" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "tcp-logs-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "http-logs-%{+YYYY.MM.dd}"
}
}
}
输入插件(Input)
1. File插件
# 文件输入插件
input {
file {
path => "/var/log/*.log"
start_position => "beginning" # beginning 或 end
sincedb_path => "/var/lib/logstash/sincedb"
codec => "json"
exclude => "*.gz"
close_older => 3600
ignore_older => 86400
tags => ["file", "log"]
type => "file_log"
}
}
File插件参数:
path: 文件路径(支持通配符)start_position: 开始位置sincedb_path: 记录读取位置的数据库路径codec: 编解码器exclude: 排除的文件close_older: 关闭旧文件的时间(秒)ignore_older: 忽略旧文件的时间(秒)
2. Beats插件
# Beats输入插件(推荐用于生产环境)
input {
beats {
port => 5044
host => "0.0.0.0"
ssl => false
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
}
3. TCP/UDP插件
# TCP输入
input {
tcp {
port => 5000
codec => json_lines
type => "tcp_input"
}
}
# UDP输入
input {
udp {
port => 5001
codec => json
type => "udp_input"
}
}
4. HTTP插件
# HTTP输入
input {
http {
port => 8080
codec => json
type => "http_input"
user => "admin"
password => "password"
}
}
5. 数据库输入
# JDBC输入(需要安装logstash-input-jdbc插件)
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "user"
jdbc_password => "password"
jdbc_driver_library => "/path/to/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT * FROM logs WHERE updated_at > :sql_last_value"
schedule => "* * * * *"
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
}
}
过滤器插件(Filter)
1. Grok插件
Grok是Logstash最强大的过滤器,用于解析非结构化日志:
# Grok基础使用
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
}
}
}
# 自定义模式
filter {
grok {
match => {
"message" => "%{IP:client_ip} - - \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATH:path} HTTP/%{NUMBER:http_version}\" %{NUMBER:status_code} %{NUMBER:bytes}"
}
}
}
# 多模式匹配
filter {
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}",
"%{IP:client_ip} %{GREEDYDATA:message}"
]
}
}
}
常用Grok模式:
%{IP:ip}: IP地址%{WORD:word}: 单词%{NUMBER:num}: 数字%{TIMESTAMP_ISO8601:timestamp}: ISO8601时间戳%{LOGLEVEL:level}: 日志级别%{GREEDYDATA:data}: 匹配所有剩余数据
2. JSON插件
# JSON解析
filter {
json {
source => "message"
target => "parsed"
}
}
# JSON解析到根级别
filter {
json {
source => "message"
}
}
3. Date插件
# 日期解析
filter {
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
}
}
# 多格式日期匹配
filter {
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss", "ISO8601", "UNIX"]
}
}
4. Mutate插件
# Mutate字段操作
filter {
mutate {
# 添加字段
add_field => {
"new_field" => "new_value"
"environment" => "production"
}
# 删除字段
remove_field => ["old_field", "unwanted_field"]
# 重命名字段
rename => {
"old_name" => "new_name"
}
# 转换字段类型
convert => {
"price" => "float"
"quantity" => "integer"
}
# 大小写转换
uppercase => ["level"]
lowercase => ["message"]
# 字符串替换
gsub => [
"message", "\r", "",
"path", "\\", "/"
]
# 分割字段
split => {
"tags" => ","
}
# 合并字段
merge => {
"dest_field" => "source_field"
}
}
}
5. GeoIP插件
# GeoIP地理位置解析
filter {
geoip {
source => "client_ip"
target => "geoip"
fields => ["city_name", "country_name", "location", "latitude", "longitude"]
}
}
6. UserAgent插件
# UserAgent解析
filter {
useragent {
source => "user_agent"
target => "ua"
}
}
输出插件(Output)
1. Elasticsearch输出
# Elasticsearch输出
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
document_type => "_doc"
user => "elastic"
password => "password"
template_name => "logstash"
template => "/etc/logstash/templates/logstash.json"
template_overwrite => true
}
}
# 条件输出
output {
if [level] == "ERROR" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "error-logs-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
}
}
2. 文件输出
# 文件输出
output {
file {
path => "/var/log/processed/%{+YYYY-MM-dd}/%{host}.log"
codec => line {
format => "%{message}"
}
}
}
3. 标准输出
# 标准输出(调试用)
output {
stdout {
codec => rubydebug
}
}
4. 多输出
# 多输出
output {
# 输出到Elasticsearch
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
# 同时输出到文件
file {
path => "/var/log/logstash/output.log"
}
# 错误日志单独输出
if "_grokparsefailure" in [tags] {
file {
path => "/var/log/logstash/parse_errors.log"
}
}
}
完整配置示例
1. 日志处理管道
# config/log-processing.conf
input {
beats {
port => 5044
}
}
filter {
# 解析JSON日志
if [fields][log_type] == "json" {
json {
source => "message"
}
}
# 解析普通日志
else {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:message}"
}
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
}
}
# 添加地理位置信息
if [client_ip] {
geoip {
source => "client_ip"
target => "geoip"
}
}
# 解析UserAgent
if [user_agent] {
useragent {
source => "user_agent"
target => "ua"
}
}
# 字段清理
mutate {
remove_field => ["host", "agent", "ecs"]
lowercase => ["level"]
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[fields][log_type]}-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "${ES_PASSWORD}"
}
}
性能优化
1. 管道配置优化
# config/logstash.yml
# 工作线程数(建议等于CPU核心数)
pipeline.workers: 4
# 批处理大小
pipeline.batch.size: 500
# 批处理延迟(毫秒)
pipeline.batch.delay: 50
# 队列类型
queue.type: persisted
queue.max_bytes: 10gb
2. JVM配置
# config/jvm.options
# 堆内存配置
-Xms2g
-Xmx2g
# GC配置
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
总结
Logstash基础与安装配置的关键要点:
- 概述:Logstash的核心功能和架构
- 安装部署:Linux安装、Docker部署、基础配置
- 基础配置:简单配置、文件输入、多输入源
- 输入插件:File、Beats、TCP/UDP、HTTP、JDBC
- 过滤器插件:Grok、JSON、Date、Mutate、GeoIP、UserAgent
- 输出插件:Elasticsearch、文件、标准输出、多输出
- 完整示例:日志处理管道配置
- 性能优化:管道配置、JVM配置
掌握这些基础知识,可以成功部署和配置Logstash,为数据采集和处理打下坚实基础。
转载请注明:周志洋的博客 » Logstash基础与安装配置


