pipeline
在一开始,就介绍过,Logstash 对日志的处理,从 input 到 output,就像在 Linux 命令行上的管道操作一样。事实上,在 Logstash 中,对此有一个专门的名词,叫 Pipeline。
Pipeline 的代码加载路径如下:
bin/logstash
->lib/logstash/runner.rb
->lib/logstash/agent.rb
->lib/logstash/pipeline.rb
Logstash 2.2 版对 pipeline 做了大幅重构,最新版的 pipeline.rb
,可以归纳成下面这么一段缩略版的代码:
@config = grammar.parse(configstr)
code = @config.compile
eval(code)
@input_queue = LogStash::Util::WrappedSynchronousQueue.new
LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
@inputs.each do |input|
input.register
@input_threads << Thread.new do
LogStash::Util::set_thread_name("[#{pipeline_id}]<#{input.class.config_name}")
plugin.run(@input_queue)
end
end
@outputs.each {|o| o.register }
@filters.each {|f| f.register }
@settings[:pipeline_workers].times do |t|
@worker_threads << Thread.new do
LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
while true
input_batch = []
batch_size.times do |t|
event = (t == 0) ? @input_queue.take : @input_queue.poll(batch_delay)
input_batch << event
end
input_batch.reduce([]) do |acc,e|
filtered = filter_func(e)
filtered.each {|fe| acc << fe unless fe.cancelled?}
acc
end
.reduce(Hash.new { |h, k| h[k] = [] }) do |acc, event|
outputs_for_event = output_func(event) || []
outputs_for_event.each { |output| acc[output] << event }
acc
end
.each { |output, events| output.multi_receive(events) }
end
end
end
整个缩略版,可以了解到一个关键信息,对我们理解 Logstash 原理是最有用的:@input_queue
是一个固定大小为 0 的多线程同步队列。filter 和 output 插件,则在相同的 pipeline_worker
线程中运行,该线程每次批量获取数据,也批量传递给 filter 和 output 插件。
由于 input 到 filter 之间有唯一的队列,任意一个 filter 或者 output 发生堵塞,都会一直堵塞到最前端的接收。这也是 logstash-input-heartbeat 的理论基础。
注:2.2 版这种改造,导致 logstash-output-elasticsearch 的 ESClient 数量比过去大幅增加,对写入 Elasticsearch 的性能是不利的。目前官方已经意识到这个问题,正在实现一个多线程共享的 ESClient 对象。在此之前,建议大家谨慎使用。