From 82fa9933614a41237dbe8800b1bd76a121b6cf11 Mon Sep 17 00:00:00 2001 From: Tommie Gannert Date: Sun, 21 Jun 2020 01:07:59 +0200 Subject: [PATCH] Splits the log source handling with a pluggable interface. Provides a cleaner split between log sources, specifically for not compiling with systemd libraries. This is in preparation for a new log source to read from Docker. --- logsource.go | 63 ++++++++++++++++ logsource_file.go | 78 ++++++++++++++++++++ logsource_file_test.go | 87 ++++++++++++++++++++++ logsource_systemd.go | 143 ++++++++++++++++++++++++++++++++++++ logsource_systemd_test.go | 150 ++++++++++++++++++++++++++++++++++++++ main.go | 33 +++------ nosystemd.go | 25 ------- postfix_exporter.go | 106 ++++++++------------------- postfix_exporter_test.go | 10 +-- systemd.go | 119 ------------------------------ 10 files changed, 568 insertions(+), 246 deletions(-) create mode 100644 logsource.go create mode 100644 logsource_file.go create mode 100644 logsource_file_test.go create mode 100644 logsource_systemd.go create mode 100644 logsource_systemd_test.go delete mode 100644 nosystemd.go delete mode 100644 systemd.go diff --git a/logsource.go b/logsource.go new file mode 100644 index 0000000..3f8318b --- /dev/null +++ b/logsource.go @@ -0,0 +1,63 @@ +package main + +import ( + "fmt" + "io" + + "github.com/alecthomas/kingpin" +) + +// A LogSourceFactory provides a repository of log sources that can be +// instantiated from command line flags. +type LogSourceFactory interface { + // Init adds the factory's struct fields as flags in the + // application. + Init(*kingpin.Application) + + // New attempts to create a new log source. This is called after + // flags have been parsed. Returning `nil, nil`, means the user + // didn't want this log source. + New() (LogSourceCloser, error) +} + +type LogSourceCloser interface { + io.Closer + LogSource +} + +var logSourceFactories []LogSourceFactory + +// RegisterLogSourceFactory can be called from module `init` functions +// to register factories. +func RegisterLogSourceFactory(lsf LogSourceFactory) { + logSourceFactories = append(logSourceFactories, lsf) +} + +// InitLogSourceFactories runs Init on all factories. The +// initialization order is arbitrary, except `fileLogSourceFactory` is +// always last (the fallback). The file log source must be last since +// it's enabled by default. +func InitLogSourceFactories(app *kingpin.Application) { + RegisterLogSourceFactory(&fileLogSourceFactory{}) + + for _, f := range logSourceFactories { + f.Init(app) + } +} + +// NewLogSourceFromFactories iterates through the factories and +// attempts to instantiate a log source. The first factory to return +// success wins. +func NewLogSourceFromFactories() (LogSourceCloser, error) { + for _, f := range logSourceFactories { + src, err := f.New() + if err != nil { + return nil, err + } + if src != nil { + return src, nil + } + } + + return nil, fmt.Errorf("no log source configured") +} diff --git a/logsource_file.go b/logsource_file.go new file mode 100644 index 0000000..dfe85e8 --- /dev/null +++ b/logsource_file.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "io" + "log" + + "github.com/alecthomas/kingpin" + "github.com/hpcloud/tail" +) + +// A FileLogSource can read lines from a file. +type FileLogSource struct { + tailer *tail.Tail +} + +// NewFileLogSource creates a new log source, tailing the given file. +func NewFileLogSource(path string) (*FileLogSource, error) { + tailer, err := tail.TailFile(path, tail.Config{ + ReOpen: true, // reopen the file if it's rotated + MustExist: true, // fail immediately if the file is missing or has incorrect permissions + Follow: true, // run in follow mode + Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file + Logger: tail.DiscardingLogger, + }) + if err != nil { + return nil, err + } + return &FileLogSource{tailer}, nil +} + +func (s *FileLogSource) Close() error { + defer s.tailer.Cleanup() + go func() { + // Stop() waits for the tailer goroutine to shut down, but it + // can be blocking on sending on the Lines channel... + for range s.tailer.Lines { + } + }() + return s.tailer.Stop() +} + +func (s *FileLogSource) Path() string { + return s.tailer.Filename +} + +func (s *FileLogSource) Read(ctx context.Context) (string, error) { + select { + case line, ok := <-s.tailer.Lines: + if !ok { + return "", io.EOF + } + return line.Text, nil + case <-ctx.Done(): + return "", ctx.Err() + } +} + +// A fileLogSourceFactory is a factory than can create log sources +// from command line flags. +// +// Because this factory is enabled by default, it must always be +// registered last. +type fileLogSourceFactory struct { + path string +} + +func (f *fileLogSourceFactory) Init(app *kingpin.Application) { + app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").StringVar(&f.path) +} + +func (f *fileLogSourceFactory) New() (LogSourceCloser, error) { + if f.path == "" { + return nil, nil + } + log.Printf("Reading log events from %s", f.path) + return NewFileLogSource(f.path) +} diff --git a/logsource_file_test.go b/logsource_file_test.go new file mode 100644 index 0000000..673be04 --- /dev/null +++ b/logsource_file_test.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestFileLogSource_Path(t *testing.T) { + path, close, err := setupFakeLogFile() + if err != nil { + t.Fatalf("setupFakeTailer failed: %v", err) + } + defer close() + + src, err := NewFileLogSource(path) + if err != nil { + t.Fatalf("NewFileLogSource failed: %v", err) + } + defer src.Close() + + assert.Equal(t, path, src.Path(), "Path should be set by New.") +} + +func TestFileLogSource_Read(t *testing.T) { + ctx := context.Background() + + path, close, err := setupFakeLogFile() + if err != nil { + t.Fatalf("setupFakeTailer failed: %v", err) + } + defer close() + + src, err := NewFileLogSource(path) + if err != nil { + t.Fatalf("NewFileLogSource failed: %v", err) + } + defer src.Close() + + s, err := src.Read(ctx) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") +} + +func setupFakeLogFile() (string, func(), error) { + f, err := ioutil.TempFile("", "filelogsource") + if err != nil { + return "", nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + defer os.Remove(f.Name()) + defer f.Close() + + for { + // The tailer seeks to the end and then does a + // follow. Keep writing lines so we know it wakes up and + // returns lines. + fmt.Fprintln(f, "Feb 13 23:31:30 ahost anid[123]: aline") + + select { + case <-time.After(10 * time.Millisecond): + // continue + case <-ctx.Done(): + return + } + } + }() + + return f.Name(), func() { + cancel() + wg.Wait() + }, nil +} diff --git a/logsource_systemd.go b/logsource_systemd.go new file mode 100644 index 0000000..76d3be8 --- /dev/null +++ b/logsource_systemd.go @@ -0,0 +1,143 @@ +// +build !nosystemd,linux + +package main + +import ( + "context" + "fmt" + "io" + "log" + "time" + + "github.com/alecthomas/kingpin" + "github.com/coreos/go-systemd/v22/sdjournal" +) + +// timeNow is a test fake injection point. +var timeNow = time.Now + +// A SystemdLogSource reads log records from the given Systemd +// journal. +type SystemdLogSource struct { + journal SystemdJournal + path string +} + +// A SystemdJournal is the journal interface that sdjournal.Journal +// provides. See https://pkg.go.dev/github.com/coreos/go-systemd/sdjournal?tab=doc +type SystemdJournal interface { + io.Closer + AddMatch(match string) error + GetEntry() (*sdjournal.JournalEntry, error) + Next() (uint64, error) + SeekRealtimeUsec(usec uint64) error + Wait(timeout time.Duration) int +} + +// NewSystemdLogSource returns a log source for reading Systemd +// journal entries. `unit` and `slice` provide filtering if non-empty +// (with `slice` taking precedence). +func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLogSource, error) { + logSrc := &SystemdLogSource{journal: j, path: path} + + var err error + if slice != "" { + err = logSrc.journal.AddMatch("_SYSTEMD_SLICE=" + slice) + } else if unit != "" { + err = logSrc.journal.AddMatch("_SYSTEMD_UNIT=" + unit) + } + if err != nil { + logSrc.journal.Close() + return nil, err + } + + // Start at end of journal + if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil { + logSrc.journal.Close() + return nil, err + } + + if r := logSrc.journal.Wait(1 * time.Second); r < 0 { + logSrc.journal.Close() + return nil, err + } + + return logSrc, nil +} + +func (s *SystemdLogSource) Close() error { + return s.journal.Close() +} + +func (s *SystemdLogSource) Path() string { + return s.path +} + +func (s *SystemdLogSource) Read(ctx context.Context) (string, error) { + c, err := s.journal.Next() + if err != nil { + return "", err + } + if c == 0 { + return "", io.EOF + } + + e, err := s.journal.GetEntry() + if err != nil { + return "", err + } + ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond)) + + return fmt.Sprintf( + "%s %s %s[%s]: %s", + ts.Format(time.Stamp), + e.Fields["_HOSTNAME"], + e.Fields["SYSLOG_IDENTIFIER"], + e.Fields["_PID"], + e.Fields["MESSAGE"], + ), nil +} + +// A systemdLogSourceFactory is a factory that can create +// SystemdLogSources from command line flags. +type systemdLogSourceFactory struct { + enable bool + unit, slice, path string +} + +func (f *systemdLogSourceFactory) Init(app *kingpin.Application) { + app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(&f.enable) + app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(&f.unit) + app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(&f.slice) + app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(&f.path) +} + +func (f *systemdLogSourceFactory) New() (LogSourceCloser, error) { + if !f.enable { + return nil, nil + } + + log.Println("Reading log events from systemd") + j, path, err := newSystemdJournal(f.path) + if err != nil { + return nil, err + } + return NewSystemdLogSource(j, path, f.unit, f.slice) +} + +// newSystemdJournal creates a journal handle. It returns the handle +// and a string representation of it. If `path` is empty, it connects +// to the local journald. +func newSystemdJournal(path string) (*sdjournal.Journal, string, error) { + if path != "" { + j, err := sdjournal.NewJournalFromDir(path) + return j, path, err + } + + j, err := sdjournal.NewJournal() + return j, "journald", err +} + +func init() { + RegisterLogSourceFactory(&systemdLogSourceFactory{}) +} diff --git a/logsource_systemd_test.go b/logsource_systemd_test.go new file mode 100644 index 0000000..a1f9c4a --- /dev/null +++ b/logsource_systemd_test.go @@ -0,0 +1,150 @@ +// +build !nosystemd,linux + +package main + +import ( + "context" + "io" + "os" + "testing" + "time" + + "github.com/coreos/go-systemd/v22/sdjournal" + "github.com/stretchr/testify/assert" +) + +func TestNewSystemdLogSource(t *testing.T) { + j := &fakeSystemdJournal{} + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + + assert.Equal(t, []string{"_SYSTEMD_SLICE=aslice"}, j.addMatchCalls, "A match should be added for slice.") + assert.Equal(t, []uint64{1234567890000000}, j.seekRealtimeUsecCalls, "A call to SeekRealtimeUsec should be made.") + assert.Equal(t, []time.Duration{1 * time.Second}, j.waitCalls, "A call to Wait should be made.") + + if err := src.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + assert.Equal(t, 1, j.closeCalls, "A call to Close should be made.") +} + +func TestSystemdLogSource_Path(t *testing.T) { + j := &fakeSystemdJournal{} + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + defer src.Close() + + assert.Equal(t, "apath", src.Path(), "Path should be set by New.") +} + +func TestSystemdLogSource_Read(t *testing.T) { + ctx := context.Background() + + j := &fakeSystemdJournal{ + getEntryValues: []sdjournal.JournalEntry{ + { + Fields: map[string]string{ + "_HOSTNAME": "ahost", + "SYSLOG_IDENTIFIER": "anid", + "_PID": "123", + "MESSAGE": "aline", + }, + RealtimeTimestamp: 1234567890000000, + }, + }, + nextValues: []uint64{1}, + } + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + defer src.Close() + + s, err := src.Read(ctx) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") +} + +func TestSystemdLogSource_ReadEOF(t *testing.T) { + ctx := context.Background() + + j := &fakeSystemdJournal{ + nextValues: []uint64{0}, + } + src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") + if err != nil { + t.Fatalf("NewSystemdLogSource failed: %v", err) + } + defer src.Close() + + _, err = src.Read(ctx) + assert.Equal(t, io.EOF, err, "Should interpret Next 0 as EOF.") +} + +func TestMain(m *testing.M) { + // We compare Unix timestamps to date strings, so make it deterministic. + os.Setenv("TZ", "UTC") + timeNow = func() time.Time { return time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) } + defer func() { + timeNow = time.Now + }() + + os.Exit(m.Run()) +} + +type fakeSystemdJournal struct { + getEntryValues []sdjournal.JournalEntry + getEntryError error + nextValues []uint64 + nextError error + + addMatchCalls []string + closeCalls int + seekRealtimeUsecCalls []uint64 + waitCalls []time.Duration +} + +func (j *fakeSystemdJournal) AddMatch(match string) error { + j.addMatchCalls = append(j.addMatchCalls, match) + return nil +} + +func (j *fakeSystemdJournal) Close() error { + j.closeCalls++ + return nil +} + +func (j *fakeSystemdJournal) GetEntry() (*sdjournal.JournalEntry, error) { + if len(j.getEntryValues) == 0 { + return nil, j.getEntryError + } + e := j.getEntryValues[0] + j.getEntryValues = j.getEntryValues[1:] + return &e, nil +} + +func (j *fakeSystemdJournal) Next() (uint64, error) { + if len(j.nextValues) == 0 { + return 0, j.nextError + } + v := j.nextValues[0] + j.nextValues = j.nextValues[1:] + return v, nil +} + +func (j *fakeSystemdJournal) SeekRealtimeUsec(usec uint64) error { + j.seekRealtimeUsecCalls = append(j.seekRealtimeUsecCalls, usec) + return nil +} + +func (j *fakeSystemdJournal) Wait(timeout time.Duration) int { + j.waitCalls = append(j.waitCalls, timeout) + return 0 +} diff --git a/main.go b/main.go index 2eaa9fc..2e5e8cb 100644 --- a/main.go +++ b/main.go @@ -13,36 +13,25 @@ import ( func main() { var ( - app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") - listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() - metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() - postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String() - postfixLogfilePath = app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").String() - logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool() - systemdEnable bool - systemdUnit, systemdSlice, systemdJournalPath string + app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") + listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() + metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() + postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String() + logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool() ) - systemdFlags(&systemdEnable, &systemdUnit, &systemdSlice, &systemdJournalPath, app) + InitLogSourceFactories(app) kingpin.MustParse(app.Parse(os.Args[1:])) - var journal *Journal - if systemdEnable { - var err error - journal, err = NewJournal(systemdUnit, systemdSlice, systemdJournalPath) - if err != nil { - log.Fatalf("Error opening systemd journal: %s", err) - } - defer journal.Close() - log.Println("Reading log events from systemd") - } else { - log.Printf("Reading log events from %v", *postfixLogfilePath) + logSrc, err := NewLogSourceFromFactories() + if err != nil { + log.Fatalf("Error opening log source: %s", err) } + defer logSrc.Close() exporter, err := NewPostfixExporter( *postfixShowqPath, - *postfixLogfilePath, - journal, + logSrc, *logUnsupportedLines, ) if err != nil { diff --git a/nosystemd.go b/nosystemd.go deleted file mode 100644 index 901d270..0000000 --- a/nosystemd.go +++ /dev/null @@ -1,25 +0,0 @@ -// +build nosystemd !linux -// This file contains stubs to support non-systemd use - -package main - -import ( - "io" - - "github.com/alecthomas/kingpin" -) - -type Journal struct { - io.Closer - Path string -} - -func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) {} - -func NewJournal(unit, slice, path string) (*Journal, error) { - return nil, nil -} - -func (e *PostfixExporter) CollectLogfileFromJournal() error { - return nil -} diff --git a/postfix_exporter.go b/postfix_exporter.go index b9683b1..226542d 100644 --- a/postfix_exporter.go +++ b/postfix_exporter.go @@ -27,7 +27,6 @@ import ( "strings" "time" - "github.com/hpcloud/tail" "github.com/prometheus/client_golang/prometheus" ) @@ -42,8 +41,7 @@ var ( // Postfix Prometheus metrics exporter across scrapes. type PostfixExporter struct { showqPath string - journal *Journal - tailer *tail.Tail + logSrc LogSource logUnsupportedLines bool // Metrics that should persist after refreshes, based on logs. @@ -72,6 +70,16 @@ type PostfixExporter struct { opendkimSignatureAdded *prometheus.CounterVec } +// A LogSource is an interface to read log lines. +type LogSource interface { + // Path returns a representation of the log location. + Path() string + + // Read returns the next log line. Returns `io.EOF` at the end of + // the log. + Read(context.Context) (string, error) +} + // CollectShowqFromReader parses the output of Postfix's 'showq' command // and turns it into metrics. // @@ -420,50 +428,13 @@ func addToHistogramVec(h *prometheus.HistogramVec, value, fieldName string, labe h.WithLabelValues(labels...).Observe(float) } -// CollectLogfileFromFile tails a Postfix log file and collects entries from it. -func (e *PostfixExporter) CollectLogfileFromFile(ctx context.Context) { - gaugeVec := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "postfix", - Subsystem: "", - Name: "up", - Help: "Whether scraping Postfix's metrics was successful.", - }, - []string{"path"}) - gauge := gaugeVec.WithLabelValues(e.tailer.Filename) - for { - select { - case line := <-e.tailer.Lines: - e.CollectFromLogLine(line.Text) - case <-ctx.Done(): - gauge.Set(0) - return - } - gauge.Set(1) - } -} - // NewPostfixExporter creates a new Postfix exporter instance. -func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal, logUnsupportedLines bool) (*PostfixExporter, error) { - var tailer *tail.Tail - if logfilePath != "" { - var err error - tailer, err = tail.TailFile(logfilePath, tail.Config{ - ReOpen: true, // reopen the file if it's rotated - MustExist: true, // fail immediately if the file is missing or has incorrect permissions - Follow: true, // run in follow mode - Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file - }) - if err != nil { - return nil, err - } - } +func NewPostfixExporter(showqPath string, logSrc LogSource, logUnsupportedLines bool) (*PostfixExporter, error) { timeBuckets := []float64{1e-3, 1e-2, 1e-1, 1.0, 10, 1 * 60, 1 * 60 * 60, 24 * 60 * 60, 2 * 24 * 60 * 60} return &PostfixExporter{ logUnsupportedLines: logUnsupportedLines, showqPath: showqPath, - tailer: tailer, - journal: journal, + logSrc: logSrc, cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "postfix", @@ -613,7 +584,7 @@ func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal, func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) { ch <- postfixUpDesc - if e.tailer == nil && e.journal == nil { + if e.logSrc == nil { return } ch <- e.cleanupProcesses.Desc() @@ -641,38 +612,12 @@ func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) { e.opendkimSignatureAdded.Describe(ch) } -func (e *PostfixExporter) foreverCollectFromJournal(ctx context.Context) { - gauge := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "postfix", - Subsystem: "", - Name: "up", - Help: "Whether scraping Postfix's metrics was successful.", - }, - []string{"path"}).WithLabelValues(e.journal.Path) - select { - case <-ctx.Done(): - gauge.Set(0) - return - default: - err := e.CollectLogfileFromJournal() - if err != nil { - log.Printf("Couldn't read journal: %v", err) - gauge.Set(0) - } else { - gauge.Set(1) - } - } -} - func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { - if e.journal != nil { - e.foreverCollectFromJournal(ctx) - } else if e.tailer != nil { - e.CollectLogfileFromFile(ctx) + if e.logSrc == nil { + return } - prometheus.NewGaugeVec( + gaugeVec := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "postfix", Subsystem: "", @@ -680,7 +625,20 @@ func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { Help: "Whether scraping Postfix's metrics was successful.", }, []string{"path"}) - return + gauge := gaugeVec.WithLabelValues(e.logSrc.Path()) + defer gauge.Set(0) + + for { + line, err := e.logSrc.Read(ctx) + if err != nil { + if err != io.EOF { + log.Printf("Couldn't read journal: %v", err) + } + return + } + e.CollectFromLogLine(line) + gauge.Set(1) + } } // Collect metrics from Postfix's showq socket and its log file. @@ -701,7 +659,7 @@ func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) { e.showqPath) } - if e.tailer == nil && e.journal == nil { + if e.logSrc == nil { return } ch <- e.cleanupProcesses diff --git a/postfix_exporter_test.go b/postfix_exporter_test.go index d0d06f2..a3426a4 100644 --- a/postfix_exporter_test.go +++ b/postfix_exporter_test.go @@ -1,18 +1,17 @@ package main import ( - "github.com/hpcloud/tail" + "testing" + "github.com/prometheus/client_golang/prometheus" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" - "testing" ) func TestPostfixExporter_CollectFromLogline(t *testing.T) { type fields struct { showqPath string - journal *Journal - tailer *tail.Tail + logSrc LogSource cleanupProcesses prometheus.Counter cleanupRejects prometheus.Counter cleanupNotAccepted prometheus.Counter @@ -173,8 +172,7 @@ func TestPostfixExporter_CollectFromLogline(t *testing.T) { t.Run(tt.name, func(t *testing.T) { e := &PostfixExporter{ showqPath: tt.fields.showqPath, - journal: tt.fields.journal, - tailer: tt.fields.tailer, + logSrc: tt.fields.logSrc, cleanupProcesses: tt.fields.cleanupProcesses, cleanupRejects: tt.fields.cleanupRejects, cleanupNotAccepted: tt.fields.cleanupNotAccepted, diff --git a/systemd.go b/systemd.go deleted file mode 100644 index e459c8b..0000000 --- a/systemd.go +++ /dev/null @@ -1,119 +0,0 @@ -// +build !nosystemd,linux - -package main - -import ( - "fmt" - "log" - "sync" - "time" - - "github.com/alecthomas/kingpin" - "github.com/coreos/go-systemd/v22/sdjournal" -) - -// Journal represents a lockable systemd journal. -type Journal struct { - *sdjournal.Journal - sync.Mutex - Path string -} - -// NewJournal returns a Journal for reading journal entries. -func NewJournal(unit, slice, path string) (j *Journal, err error) { - j = new(Journal) - if path != "" { - j.Journal, err = sdjournal.NewJournalFromDir(path) - j.Path = path - } else { - j.Journal, err = sdjournal.NewJournal() - j.Path = "journald" - } - if err != nil { - return - } - - if slice != "" { - err = j.AddMatch("_SYSTEMD_SLICE=" + slice) - if err != nil { - return - } - } else if unit != "" { - err = j.AddMatch("_SYSTEMD_UNIT=" + unit) - if err != nil { - return - } - } - - // Start at end of journal - err = j.SeekRealtimeUsec(uint64(time.Now().UnixNano() / 1000)) - if err != nil { - log.Printf("%v", err) - } - return -} - -// NextMessage reads the next message from the journal. -func (j *Journal) NextMessage() (s string, c uint64, err error) { - var e *sdjournal.JournalEntry - - // Read to next - c, err = j.Next() - if err != nil { - return - } - // Return when on the end of journal - if c == 0 { - return - } - - // Get entry - e, err = j.GetEntry() - if err != nil { - return - } - ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond)) - - // Format entry - s = fmt.Sprintf( - "%s %s %s[%s]: %s", - ts.Format(time.Stamp), - e.Fields["_HOSTNAME"], - e.Fields["SYSLOG_IDENTIFIER"], - e.Fields["_PID"], - e.Fields["MESSAGE"], - ) - - return -} - -// systemdFlags sets the flags for use with systemd -func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) { - app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(enable) - app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(unit) - app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(slice) - app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(path) -} - -// CollectLogfileFromJournal Collects entries from the systemd journal. -func (e *PostfixExporter) CollectLogfileFromJournal() error { - e.journal.Lock() - defer e.journal.Unlock() - - r := e.journal.Wait(time.Duration(1) * time.Second) - if r < 0 { - log.Print("error while waiting for journal!") - } - for { - m, c, err := e.journal.NextMessage() - if err != nil { - return err - } - if c == 0 { - break - } - e.CollectFromLogLine(m) - } - - return nil -}