Implement collection from the systemd journal

The following flags configure this usage:

- systemd.enable:       enable collection from systemd.
- systemd.unit:         systemd unit, postfix.service by default.
- systemd.slice:        systemd slice, overrides the unit.
- systemd.journal_path: path to the systemd journal directory.

Closes #2
This commit is contained in:
Silke 2018-01-28 14:13:49 +01:00
parent a8b4bed735
commit 82c04256dc
2 changed files with 246 additions and 111 deletions

View File

@ -44,6 +44,7 @@ var (
type PostfixExporter struct { type PostfixExporter struct {
showqPath string showqPath string
logfilePath string logfilePath string
journal *Journal
// Metrics that should persist after refreshes, based on logs. // Metrics that should persist after refreshes, based on logs.
cleanupProcesses prometheus.Counter cleanupProcesses prometheus.Counter
@ -251,27 +252,24 @@ func CollectShowqFromSocket(path string, ch chan<- prometheus.Metric) error {
return CollectShowqFromReader(fd, ch) return CollectShowqFromReader(fd, ch)
} }
// CollectLogfileFromReader collects metrics from a Postfix logfile, // Patterns for parsing log messages.
// using a reader object. var (
func (e *PostfixExporter) CollectLogfileFromReader(file io.Reader) error { logLine = regexp.MustCompile(" ?postfix/(\\w+)\\[\\d+\\]: (.*)")
scanner := bufio.NewScanner(file) lmtpPipeSMTPLine = regexp.MustCompile(", relay=(\\S+), .*, delays=([0-9\\.]+)/([0-9\\.]+)/([0-9\\.]+)/([0-9\\.]+), ")
scanner.Split(bufio.ScanLines) qmgrInsertLine = regexp.MustCompile(":.*, size=(\\d+), nrcpt=(\\d+) ")
smtpTLSLine = regexp.MustCompile("^(\\S+) TLS connection established to \\S+: (\\S+) with cipher (\\S+) \\((\\d+)/(\\d+) bits\\)$")
smtpdFCrDNSErrorsLine = regexp.MustCompile("^warning: hostname \\S+ does not resolve to address ")
smtpdProcessesSASLLine = regexp.MustCompile(": client=.*, sasl_username=(\\S+)")
smtpdRejectsLine = regexp.MustCompile("^NOQUEUE: reject: RCPT from \\S+: ([0-9]+) ")
smtpdLostConnectionLine = regexp.MustCompile("^lost connection after (\\w+) from ")
smtpdSASLAuthenticationFailuresLine = regexp.MustCompile("^warning: \\S+: SASL \\S+ authentication failed: ")
smtpdTLSLine = regexp.MustCompile("^(\\S+) TLS connection established from \\S+: (\\S+) with cipher (\\S+) \\((\\d+)/(\\d+) bits\\)$")
)
// Patterns for parsing log messages. // CollectFromLogline collects metrict from a Postfix log line.
logLine := regexp.MustCompile(" postfix/(\\w+)\\[\\d+\\]: (.*)") func (e *PostfixExporter) CollectFromLogline(line string) {
lmtpPipeSMTPLine := regexp.MustCompile(", relay=(\\S+), .*, delays=([0-9\\.]+)/([0-9\\.]+)/([0-9\\.]+)/([0-9\\.]+), ")
qmgrInsertLine := regexp.MustCompile(":.*, size=(\\d+), nrcpt=(\\d+) ")
smtpTLSLine := regexp.MustCompile("^(\\S+) TLS connection established to \\S+: (\\S+) with cipher (\\S+) \\((\\d+)/(\\d+) bits\\)$")
smtpdFCrDNSErrorsLine := regexp.MustCompile("^warning: hostname \\S+ does not resolve to address ")
smtpdProcessesSASLLine := regexp.MustCompile(": client=.*, sasl_username=(\\S+)")
smtpdRejectsLine := regexp.MustCompile("^NOQUEUE: reject: RCPT from \\S+: ([0-9]+) ")
smtpdLostConnectionLine := regexp.MustCompile("^lost connection after (\\w+) from ")
smtpdSASLAuthenticationFailuresLine := regexp.MustCompile("^warning: \\S+: SASL \\S+ authentication failed: ")
smtpdTLSLine := regexp.MustCompile("^(\\S+) TLS connection established from \\S+: (\\S+) with cipher (\\S+) \\((\\d+)/(\\d+) bits\\)$")
for scanner.Scan() {
// Strip off timestamp, hostname, etc. // Strip off timestamp, hostname, etc.
if logMatches := logLine.FindStringSubmatch(scanner.Text()); logMatches != nil { if logMatches := logLine.FindStringSubmatch(line); logMatches != nil {
// Group patterns to check by Postfix service. // Group patterns to check by Postfix service.
if logMatches[1] == "cleanup" { if logMatches[1] == "cleanup" {
if strings.Contains(logMatches[2], ": message-id=<") { if strings.Contains(logMatches[2], ": message-id=<") {
@ -363,6 +361,16 @@ func (e *PostfixExporter) CollectLogfileFromReader(file io.Reader) error {
// Unknown log entry format. // Unknown log entry format.
e.unsupportedLogEntries.WithLabelValues("").Inc() e.unsupportedLogEntries.WithLabelValues("").Inc()
} }
}
// CollectLogfileFromReader collects metrics from a Postfix logfile,
// using a reader object.
func (e *PostfixExporter) CollectLogfileFromReader(file io.Reader) error {
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
e.CollectFromLogline(scanner.Text())
} }
return scanner.Err() return scanner.Err()
@ -384,11 +392,31 @@ func (e *PostfixExporter) CollectLogfileFromFile(path string) error {
return fd.Truncate(0) return fd.Truncate(0)
} }
// CollectLogfileFromJournal Collects entries from the systemd journal.
func (e *PostfixExporter) CollectLogfileFromJournal() error {
e.journal.Lock()
defer e.journal.Unlock()
for {
m, c, err := e.journal.NextMessage()
if err != nil {
return err
}
if c == 0 {
break
}
e.CollectFromLogline(m)
}
return nil
}
// NewPostfixExporter creates a new Postfix exporter instance. // NewPostfixExporter creates a new Postfix exporter instance.
func NewPostfixExporter(showqPath string, logfilePath string) (*PostfixExporter, error) { func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal) (*PostfixExporter, error) {
return &PostfixExporter{ return &PostfixExporter{
showqPath: showqPath, showqPath: showqPath,
logfilePath: logfilePath, logfilePath: logfilePath,
journal: journal,
cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{ cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "postfix", Namespace: "postfix",
@ -548,20 +576,27 @@ func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) {
e.showqPath) e.showqPath)
} }
var src string
if e.journal != nil {
err = e.CollectLogfileFromJournal()
src = e.journal.Path
} else {
err = e.CollectLogfileFromFile(e.logfilePath) err = e.CollectLogfileFromFile(e.logfilePath)
src = e.logfilePath
}
if err == nil { if err == nil {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
postfixUpDesc, postfixUpDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
1.0, 1.0,
e.logfilePath) src)
} else { } else {
log.Printf("Failed to scrape logfile: %s", err) log.Printf("Failed to scrape log: %s", err)
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
postfixUpDesc, postfixUpDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
0.0, 0.0,
e.logfilePath) src)
} }
ch <- e.cleanupProcesses ch <- e.cleanupProcesses
@ -590,10 +625,30 @@ func main() {
metricsPath = flag.String("web.telemetry-path", "/metrics", "Path under which to expose metrics.") metricsPath = flag.String("web.telemetry-path", "/metrics", "Path under which to expose metrics.")
postfixShowqPath = flag.String("postfix.showq_path", "/var/spool/postfix/public/showq", "Path at which Postfix places its showq socket.") postfixShowqPath = flag.String("postfix.showq_path", "/var/spool/postfix/public/showq", "Path at which Postfix places its showq socket.")
postfixLogfilePath = flag.String("postfix.logfile_path", "/var/log/postfix_exporter_input.log", "Path where Postfix writes log entries. This file will be truncated by this exporter.") postfixLogfilePath = flag.String("postfix.logfile_path", "/var/log/postfix_exporter_input.log", "Path where Postfix writes log entries. This file will be truncated by this exporter.")
systemdEnable = flag.Bool("systemd.enable", false, "Read from the systemd journal instead of log")
systemdUnit = flag.String("systemd.unit", "postfix.service", "Name of the Postfix systemd unit.")
systemdSlice = flag.String("systemd.slice", "", "Name of the Postfix systemd slice. Overrides the systemd unit.")
systemdJournalPath = flag.String("systemd.journal_path", "", "Path to the systemd journal")
) )
flag.Parse() flag.Parse()
exporter, err := NewPostfixExporter(*postfixShowqPath, *postfixLogfilePath) var journal *Journal
if *systemdEnable {
var err error
journal, err = NewJournal(*systemdUnit, *systemdSlice, *systemdJournalPath)
if err != nil {
log.Fatalf("Error opening systemd journal: %s", err)
}
// Start at end of journal
journal.SeekRealtimeUsec(uint64(time.Now().UnixNano() / 1000))
defer journal.Close()
}
exporter, err := NewPostfixExporter(
*postfixShowqPath,
*postfixLogfilePath,
journal,
)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -610,5 +665,7 @@ func main() {
</body> </body>
</html>`)) </html>`))
}) })
log.Print("Listening on ", *listenAddress)
log.Fatal(http.ListenAndServe(*listenAddress, nil)) log.Fatal(http.ListenAndServe(*listenAddress, nil))
} }

78
systemd.go Normal file
View File

@ -0,0 +1,78 @@
package main
import (
"fmt"
"sync"
"time"
"github.com/coreos/go-systemd/sdjournal"
)
// Journal represents a lockable systemd journal.
type Journal struct {
*sdjournal.Journal
sync.Mutex
Path string
}
// NewJournal returns a Journal for reading journal entries.
func NewJournal(unit, slice, path string) (j *Journal, err error) {
j = new(Journal)
if path != "" {
j.Journal, err = sdjournal.NewJournalFromDir(path)
j.Path = path
} else {
j.Journal, err = sdjournal.NewJournal()
j.Path = "journald"
}
if err != nil {
return
}
if slice != "" {
err = j.AddMatch("_SYSTEMD_SLICE=" + slice)
if err != nil {
return
}
} else if unit != "" {
err = j.AddMatch("_SYSTEMD_UNIT=" + unit)
if err != nil {
return
}
}
return
}
// NextMessage reads the next message from the journal.
func (j *Journal) NextMessage() (s string, c uint64, err error) {
var e *sdjournal.JournalEntry
// Read to next
c, err = j.Next()
if err != nil {
return
}
// Return when on the end of journal
if c == 0 {
return
}
// Get entry
e, err = j.GetEntry()
if err != nil {
return
}
ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond))
// Format entry
s = fmt.Sprintf(
"%s %s %s[%s]: %s",
ts.Format(time.Stamp),
e.Fields["_HOSTNAME"],
e.Fields["SYSLOG_IDENTIFIER"],
e.Fields["_PID"],
e.Fields["MESSAGE"],
)
return
}