diff --git a/README.md b/README.md index 1be73b8..e59f4f9 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ This exporter provides histogram metrics for the size and age of messages stored the mail queue. It extracts these metrics from Postfix by connecting to a UNIX socket under `/var/spool`. It also counts events by parsing Postfix's log entries, using regular expression matching. The log entries are retrieved from -the systemd journal, the Docker logs, or from a log file. +the systemd journal or from a log file. ## Options @@ -18,25 +18,11 @@ These options can be used when starting the `postfix_exporter` | `--postfix.showq_path` | Path at which Postfix places its showq socket | `/var/spool/postfix/public/showq` | | `--postfix.logfile_path` | Path where Postfix writes log entries | `/var/log/maillog` | | `--log.unsupported` | Log all unsupported lines | `false` | -| `--docker.enable` | Read from the Docker logs instead of a file | `false` | -| `--docker.container.id` | The container to read Docker logs from | `postfix` | -| `--systemd.enable` | Read from the systemd journal instead of file | `false` | +| `--systemd.enable` | Read from the systemd journal instead of log | `false` | | `--systemd.unit` | Name of the Postfix systemd unit | `postfix.service` | | `--systemd.slice` | Name of the Postfix systemd slice. | `""` | | `--systemd.journal_path` | Path to the systemd journal | `""` | -## Events from Docker - -Postfix servers running in a [Docker](https://www.docker.com/) -container can be monitored using the `--docker.enable` flag. The -default container ID is `postfix`, but can be customized with the -`--docker.container.id` flag. - -The default is to connect to the local Docker, but this can be -customized using [the `DOCKER_HOST` and -similar](https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient) -environment variables. - ## Events from log file The log file is tailed when processed. Rotating the log files while the exporter diff --git a/go.mod b/go.mod index f35270f..3d5e663 100644 --- a/go.mod +++ b/go.mod @@ -5,15 +5,8 @@ go 1.13 require ( github.com/alecthomas/kingpin v2.2.6+incompatible github.com/coreos/go-systemd/v22 v22.0.0 - github.com/docker/distribution v2.7.1+incompatible // indirect - github.com/docker/docker v1.13.1 - github.com/docker/go-connections v0.4.0 // indirect - github.com/docker/go-units v0.4.0 // indirect github.com/fsnotify/fsnotify v1.4.7 // indirect github.com/hpcloud/tail v1.0.0 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect - github.com/opencontainers/go-digest v1.0.0 // indirect github.com/prometheus/client_golang v1.4.1 github.com/prometheus/client_model v0.2.0 github.com/stretchr/testify v1.4.0 diff --git a/go.sum b/go.sum index fedcb99..bea5d93 100644 --- a/go.sum +++ b/go.sum @@ -17,14 +17,6 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= -github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v1.13.1 h1:IkZjBSIc8hBjLpqeAbeE5mca5mNgeatLHBy3GO78BWo= -github.com/docker/docker v1.13.1/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= -github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= -github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= -github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -61,10 +53,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= -github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -94,7 +83,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/logsource.go b/logsource.go deleted file mode 100644 index 492d2e2..0000000 --- a/logsource.go +++ /dev/null @@ -1,64 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io" - - "github.com/alecthomas/kingpin" -) - -// A LogSourceFactory provides a repository of log sources that can be -// instantiated from command line flags. -type LogSourceFactory interface { - // Init adds the factory's struct fields as flags in the - // application. - Init(*kingpin.Application) - - // New attempts to create a new log source. This is called after - // flags have been parsed. Returning `nil, nil`, means the user - // didn't want this log source. - New(context.Context) (LogSourceCloser, error) -} - -type LogSourceCloser interface { - io.Closer - LogSource -} - -var logSourceFactories []LogSourceFactory - -// RegisterLogSourceFactory can be called from module `init` functions -// to register factories. -func RegisterLogSourceFactory(lsf LogSourceFactory) { - logSourceFactories = append(logSourceFactories, lsf) -} - -// InitLogSourceFactories runs Init on all factories. The -// initialization order is arbitrary, except `fileLogSourceFactory` is -// always last (the fallback). The file log source must be last since -// it's enabled by default. -func InitLogSourceFactories(app *kingpin.Application) { - RegisterLogSourceFactory(&fileLogSourceFactory{}) - - for _, f := range logSourceFactories { - f.Init(app) - } -} - -// NewLogSourceFromFactories iterates through the factories and -// attempts to instantiate a log source. The first factory to return -// success wins. -func NewLogSourceFromFactories(ctx context.Context) (LogSourceCloser, error) { - for _, f := range logSourceFactories { - src, err := f.New(ctx) - if err != nil { - return nil, err - } - if src != nil { - return src, nil - } - } - - return nil, fmt.Errorf("no log source configured") -} diff --git a/logsource_docker.go b/logsource_docker.go deleted file mode 100644 index fa182a7..0000000 --- a/logsource_docker.go +++ /dev/null @@ -1,96 +0,0 @@ -// +build !nodocker - -package main - -import ( - "bufio" - "context" - "io" - "log" - "strings" - - "github.com/alecthomas/kingpin" - "github.com/docker/docker/api/types" - "github.com/docker/docker/client" -) - -// A DockerLogSource reads log records from the given Docker -// journal. -type DockerLogSource struct { - client DockerClient - containerID string - reader *bufio.Reader -} - -// A DockerClient is the client interface that client.Client -// provides. See https://pkg.go.dev/github.com/docker/docker/client -type DockerClient interface { - io.Closer - ContainerLogs(context.Context, string, types.ContainerLogsOptions) (io.ReadCloser, error) -} - -// NewDockerLogSource returns a log source for reading Docker logs. -func NewDockerLogSource(ctx context.Context, c DockerClient, containerID string) (*DockerLogSource, error) { - r, err := c.ContainerLogs(ctx, containerID, types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Tail: "0", - }) - if err != nil { - return nil, err - } - - logSrc := &DockerLogSource{ - client: c, - containerID: containerID, - reader: bufio.NewReader(r), - } - - return logSrc, nil -} - -func (s *DockerLogSource) Close() error { - return s.client.Close() -} - -func (s *DockerLogSource) Path() string { - return "docker:" + s.containerID -} - -func (s *DockerLogSource) Read(ctx context.Context) (string, error) { - line, err := s.reader.ReadString('\n') - if err != nil { - return "", err - } - return strings.TrimSpace(line), nil -} - -// A dockerLogSourceFactory is a factory that can create -// DockerLogSources from command line flags. -type dockerLogSourceFactory struct { - enable bool - containerID string -} - -func (f *dockerLogSourceFactory) Init(app *kingpin.Application) { - app.Flag("docker.enable", "Read from Docker logs. Environment variable DOCKER_HOST can be used to change the address. See https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient for more information.").Default("false").BoolVar(&f.enable) - app.Flag("docker.container.id", "ID/name of the Postfix Docker container.").Default("postfix").StringVar(&f.containerID) -} - -func (f *dockerLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) { - if !f.enable { - return nil, nil - } - - log.Println("Reading log events from Docker") - c, err := client.NewEnvClient() - if err != nil { - return nil, err - } - return NewDockerLogSource(ctx, c, f.containerID) -} - -func init() { - RegisterLogSourceFactory(&dockerLogSourceFactory{}) -} diff --git a/logsource_docker_test.go b/logsource_docker_test.go deleted file mode 100644 index 74231c2..0000000 --- a/logsource_docker_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// +build !nodocker - -package main - -import ( - "context" - "io" - "io/ioutil" - "strings" - "testing" - - "github.com/docker/docker/api/types" - "github.com/stretchr/testify/assert" -) - -func TestNewDockerLogSource(t *testing.T) { - ctx := context.Background() - c := &fakeDockerClient{} - src, err := NewDockerLogSource(ctx, c, "acontainer") - if err != nil { - t.Fatalf("NewDockerLogSource failed: %v", err) - } - - assert.Equal(t, []string{"acontainer"}, c.containerLogsCalls, "A call to ContainerLogs should be made.") - - if err := src.Close(); err != nil { - t.Fatalf("Close failed: %v", err) - } - - assert.Equal(t, 1, c.closeCalls, "A call to Close should be made.") -} - -func TestDockerLogSource_Path(t *testing.T) { - ctx := context.Background() - c := &fakeDockerClient{} - src, err := NewDockerLogSource(ctx, c, "acontainer") - if err != nil { - t.Fatalf("NewDockerLogSource failed: %v", err) - } - defer src.Close() - - assert.Equal(t, "docker:acontainer", src.Path(), "Path should be set by New.") -} - -func TestDockerLogSource_Read(t *testing.T) { - ctx := context.Background() - - c := &fakeDockerClient{ - logsReader: ioutil.NopCloser(strings.NewReader("Feb 13 23:31:30 ahost anid[123]: aline\n")), - } - src, err := NewDockerLogSource(ctx, c, "acontainer") - if err != nil { - t.Fatalf("NewDockerLogSource failed: %v", err) - } - defer src.Close() - - s, err := src.Read(ctx) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") -} - -type fakeDockerClient struct { - logsReader io.ReadCloser - - containerLogsCalls []string - closeCalls int -} - -func (c *fakeDockerClient) ContainerLogs(ctx context.Context, containerID string, opts types.ContainerLogsOptions) (io.ReadCloser, error) { - c.containerLogsCalls = append(c.containerLogsCalls, containerID) - return c.logsReader, nil -} - -func (c *fakeDockerClient) Close() error { - c.closeCalls++ - return nil -} diff --git a/logsource_file.go b/logsource_file.go deleted file mode 100644 index b906a84..0000000 --- a/logsource_file.go +++ /dev/null @@ -1,78 +0,0 @@ -package main - -import ( - "context" - "io" - "log" - - "github.com/alecthomas/kingpin" - "github.com/hpcloud/tail" -) - -// A FileLogSource can read lines from a file. -type FileLogSource struct { - tailer *tail.Tail -} - -// NewFileLogSource creates a new log source, tailing the given file. -func NewFileLogSource(path string) (*FileLogSource, error) { - tailer, err := tail.TailFile(path, tail.Config{ - ReOpen: true, // reopen the file if it's rotated - MustExist: true, // fail immediately if the file is missing or has incorrect permissions - Follow: true, // run in follow mode - Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file - Logger: tail.DiscardingLogger, - }) - if err != nil { - return nil, err - } - return &FileLogSource{tailer}, nil -} - -func (s *FileLogSource) Close() error { - defer s.tailer.Cleanup() - go func() { - // Stop() waits for the tailer goroutine to shut down, but it - // can be blocking on sending on the Lines channel... - for range s.tailer.Lines { - } - }() - return s.tailer.Stop() -} - -func (s *FileLogSource) Path() string { - return s.tailer.Filename -} - -func (s *FileLogSource) Read(ctx context.Context) (string, error) { - select { - case line, ok := <-s.tailer.Lines: - if !ok { - return "", io.EOF - } - return line.Text, nil - case <-ctx.Done(): - return "", ctx.Err() - } -} - -// A fileLogSourceFactory is a factory than can create log sources -// from command line flags. -// -// Because this factory is enabled by default, it must always be -// registered last. -type fileLogSourceFactory struct { - path string -} - -func (f *fileLogSourceFactory) Init(app *kingpin.Application) { - app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").StringVar(&f.path) -} - -func (f *fileLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) { - if f.path == "" { - return nil, nil - } - log.Printf("Reading log events from %s", f.path) - return NewFileLogSource(f.path) -} diff --git a/logsource_file_test.go b/logsource_file_test.go deleted file mode 100644 index 673be04..0000000 --- a/logsource_file_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io/ioutil" - "os" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestFileLogSource_Path(t *testing.T) { - path, close, err := setupFakeLogFile() - if err != nil { - t.Fatalf("setupFakeTailer failed: %v", err) - } - defer close() - - src, err := NewFileLogSource(path) - if err != nil { - t.Fatalf("NewFileLogSource failed: %v", err) - } - defer src.Close() - - assert.Equal(t, path, src.Path(), "Path should be set by New.") -} - -func TestFileLogSource_Read(t *testing.T) { - ctx := context.Background() - - path, close, err := setupFakeLogFile() - if err != nil { - t.Fatalf("setupFakeTailer failed: %v", err) - } - defer close() - - src, err := NewFileLogSource(path) - if err != nil { - t.Fatalf("NewFileLogSource failed: %v", err) - } - defer src.Close() - - s, err := src.Read(ctx) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") -} - -func setupFakeLogFile() (string, func(), error) { - f, err := ioutil.TempFile("", "filelogsource") - if err != nil { - return "", nil, err - } - - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - defer os.Remove(f.Name()) - defer f.Close() - - for { - // The tailer seeks to the end and then does a - // follow. Keep writing lines so we know it wakes up and - // returns lines. - fmt.Fprintln(f, "Feb 13 23:31:30 ahost anid[123]: aline") - - select { - case <-time.After(10 * time.Millisecond): - // continue - case <-ctx.Done(): - return - } - } - }() - - return f.Name(), func() { - cancel() - wg.Wait() - }, nil -} diff --git a/logsource_systemd.go b/logsource_systemd.go deleted file mode 100644 index 60f5cb6..0000000 --- a/logsource_systemd.go +++ /dev/null @@ -1,143 +0,0 @@ -// +build !nosystemd,linux - -package main - -import ( - "context" - "fmt" - "io" - "log" - "time" - - "github.com/alecthomas/kingpin" - "github.com/coreos/go-systemd/v22/sdjournal" -) - -// timeNow is a test fake injection point. -var timeNow = time.Now - -// A SystemdLogSource reads log records from the given Systemd -// journal. -type SystemdLogSource struct { - journal SystemdJournal - path string -} - -// A SystemdJournal is the journal interface that sdjournal.Journal -// provides. See https://pkg.go.dev/github.com/coreos/go-systemd/sdjournal?tab=doc -type SystemdJournal interface { - io.Closer - AddMatch(match string) error - GetEntry() (*sdjournal.JournalEntry, error) - Next() (uint64, error) - SeekRealtimeUsec(usec uint64) error - Wait(timeout time.Duration) int -} - -// NewSystemdLogSource returns a log source for reading Systemd -// journal entries. `unit` and `slice` provide filtering if non-empty -// (with `slice` taking precedence). -func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLogSource, error) { - logSrc := &SystemdLogSource{journal: j, path: path} - - var err error - if slice != "" { - err = logSrc.journal.AddMatch("_SYSTEMD_SLICE=" + slice) - } else if unit != "" { - err = logSrc.journal.AddMatch("_SYSTEMD_UNIT=" + unit) - } - if err != nil { - logSrc.journal.Close() - return nil, err - } - - // Start at end of journal - if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil { - logSrc.journal.Close() - return nil, err - } - - if r := logSrc.journal.Wait(1 * time.Second); r < 0 { - logSrc.journal.Close() - return nil, err - } - - return logSrc, nil -} - -func (s *SystemdLogSource) Close() error { - return s.journal.Close() -} - -func (s *SystemdLogSource) Path() string { - return s.path -} - -func (s *SystemdLogSource) Read(ctx context.Context) (string, error) { - c, err := s.journal.Next() - if err != nil { - return "", err - } - if c == 0 { - return "", io.EOF - } - - e, err := s.journal.GetEntry() - if err != nil { - return "", err - } - ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond)) - - return fmt.Sprintf( - "%s %s %s[%s]: %s", - ts.Format(time.Stamp), - e.Fields["_HOSTNAME"], - e.Fields["SYSLOG_IDENTIFIER"], - e.Fields["_PID"], - e.Fields["MESSAGE"], - ), nil -} - -// A systemdLogSourceFactory is a factory that can create -// SystemdLogSources from command line flags. -type systemdLogSourceFactory struct { - enable bool - unit, slice, path string -} - -func (f *systemdLogSourceFactory) Init(app *kingpin.Application) { - app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(&f.enable) - app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(&f.unit) - app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(&f.slice) - app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(&f.path) -} - -func (f *systemdLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) { - if !f.enable { - return nil, nil - } - - log.Println("Reading log events from systemd") - j, path, err := newSystemdJournal(f.path) - if err != nil { - return nil, err - } - return NewSystemdLogSource(j, path, f.unit, f.slice) -} - -// newSystemdJournal creates a journal handle. It returns the handle -// and a string representation of it. If `path` is empty, it connects -// to the local journald. -func newSystemdJournal(path string) (*sdjournal.Journal, string, error) { - if path != "" { - j, err := sdjournal.NewJournalFromDir(path) - return j, path, err - } - - j, err := sdjournal.NewJournal() - return j, "journald", err -} - -func init() { - RegisterLogSourceFactory(&systemdLogSourceFactory{}) -} diff --git a/logsource_systemd_test.go b/logsource_systemd_test.go deleted file mode 100644 index a1f9c4a..0000000 --- a/logsource_systemd_test.go +++ /dev/null @@ -1,150 +0,0 @@ -// +build !nosystemd,linux - -package main - -import ( - "context" - "io" - "os" - "testing" - "time" - - "github.com/coreos/go-systemd/v22/sdjournal" - "github.com/stretchr/testify/assert" -) - -func TestNewSystemdLogSource(t *testing.T) { - j := &fakeSystemdJournal{} - src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") - if err != nil { - t.Fatalf("NewSystemdLogSource failed: %v", err) - } - - assert.Equal(t, []string{"_SYSTEMD_SLICE=aslice"}, j.addMatchCalls, "A match should be added for slice.") - assert.Equal(t, []uint64{1234567890000000}, j.seekRealtimeUsecCalls, "A call to SeekRealtimeUsec should be made.") - assert.Equal(t, []time.Duration{1 * time.Second}, j.waitCalls, "A call to Wait should be made.") - - if err := src.Close(); err != nil { - t.Fatalf("Close failed: %v", err) - } - - assert.Equal(t, 1, j.closeCalls, "A call to Close should be made.") -} - -func TestSystemdLogSource_Path(t *testing.T) { - j := &fakeSystemdJournal{} - src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") - if err != nil { - t.Fatalf("NewSystemdLogSource failed: %v", err) - } - defer src.Close() - - assert.Equal(t, "apath", src.Path(), "Path should be set by New.") -} - -func TestSystemdLogSource_Read(t *testing.T) { - ctx := context.Background() - - j := &fakeSystemdJournal{ - getEntryValues: []sdjournal.JournalEntry{ - { - Fields: map[string]string{ - "_HOSTNAME": "ahost", - "SYSLOG_IDENTIFIER": "anid", - "_PID": "123", - "MESSAGE": "aline", - }, - RealtimeTimestamp: 1234567890000000, - }, - }, - nextValues: []uint64{1}, - } - src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") - if err != nil { - t.Fatalf("NewSystemdLogSource failed: %v", err) - } - defer src.Close() - - s, err := src.Read(ctx) - if err != nil { - t.Fatalf("Read failed: %v", err) - } - assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") -} - -func TestSystemdLogSource_ReadEOF(t *testing.T) { - ctx := context.Background() - - j := &fakeSystemdJournal{ - nextValues: []uint64{0}, - } - src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice") - if err != nil { - t.Fatalf("NewSystemdLogSource failed: %v", err) - } - defer src.Close() - - _, err = src.Read(ctx) - assert.Equal(t, io.EOF, err, "Should interpret Next 0 as EOF.") -} - -func TestMain(m *testing.M) { - // We compare Unix timestamps to date strings, so make it deterministic. - os.Setenv("TZ", "UTC") - timeNow = func() time.Time { return time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) } - defer func() { - timeNow = time.Now - }() - - os.Exit(m.Run()) -} - -type fakeSystemdJournal struct { - getEntryValues []sdjournal.JournalEntry - getEntryError error - nextValues []uint64 - nextError error - - addMatchCalls []string - closeCalls int - seekRealtimeUsecCalls []uint64 - waitCalls []time.Duration -} - -func (j *fakeSystemdJournal) AddMatch(match string) error { - j.addMatchCalls = append(j.addMatchCalls, match) - return nil -} - -func (j *fakeSystemdJournal) Close() error { - j.closeCalls++ - return nil -} - -func (j *fakeSystemdJournal) GetEntry() (*sdjournal.JournalEntry, error) { - if len(j.getEntryValues) == 0 { - return nil, j.getEntryError - } - e := j.getEntryValues[0] - j.getEntryValues = j.getEntryValues[1:] - return &e, nil -} - -func (j *fakeSystemdJournal) Next() (uint64, error) { - if len(j.nextValues) == 0 { - return 0, j.nextError - } - v := j.nextValues[0] - j.nextValues = j.nextValues[1:] - return v, nil -} - -func (j *fakeSystemdJournal) SeekRealtimeUsec(usec uint64) error { - j.seekRealtimeUsecCalls = append(j.seekRealtimeUsecCalls, usec) - return nil -} - -func (j *fakeSystemdJournal) Wait(timeout time.Duration) int { - j.waitCalls = append(j.waitCalls, timeout) - return 0 -} diff --git a/main.go b/main.go index c92d7d5..2eaa9fc 100644 --- a/main.go +++ b/main.go @@ -13,26 +13,36 @@ import ( func main() { var ( - ctx = context.Background() - app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") - listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() - metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() - postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String() - logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool() + app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") + listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() + metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() + postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String() + postfixLogfilePath = app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").String() + logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool() + systemdEnable bool + systemdUnit, systemdSlice, systemdJournalPath string ) + systemdFlags(&systemdEnable, &systemdUnit, &systemdSlice, &systemdJournalPath, app) - InitLogSourceFactories(app) kingpin.MustParse(app.Parse(os.Args[1:])) - logSrc, err := NewLogSourceFromFactories(ctx) - if err != nil { - log.Fatalf("Error opening log source: %s", err) + var journal *Journal + if systemdEnable { + var err error + journal, err = NewJournal(systemdUnit, systemdSlice, systemdJournalPath) + if err != nil { + log.Fatalf("Error opening systemd journal: %s", err) + } + defer journal.Close() + log.Println("Reading log events from systemd") + } else { + log.Printf("Reading log events from %v", *postfixLogfilePath) } - defer logSrc.Close() exporter, err := NewPostfixExporter( *postfixShowqPath, - logSrc, + *postfixLogfilePath, + journal, *logUnsupportedLines, ) if err != nil { @@ -54,7 +64,7 @@ func main() { panic(err) } }) - ctx, cancelFunc := context.WithCancel(ctx) + ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() go exporter.StartMetricCollection(ctx) log.Print("Listening on ", *listenAddress) diff --git a/nosystemd.go b/nosystemd.go new file mode 100644 index 0000000..901d270 --- /dev/null +++ b/nosystemd.go @@ -0,0 +1,25 @@ +// +build nosystemd !linux +// This file contains stubs to support non-systemd use + +package main + +import ( + "io" + + "github.com/alecthomas/kingpin" +) + +type Journal struct { + io.Closer + Path string +} + +func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) {} + +func NewJournal(unit, slice, path string) (*Journal, error) { + return nil, nil +} + +func (e *PostfixExporter) CollectLogfileFromJournal() error { + return nil +} diff --git a/postfix_exporter.go b/postfix_exporter.go index bf8de9e..b9683b1 100644 --- a/postfix_exporter.go +++ b/postfix_exporter.go @@ -27,6 +27,7 @@ import ( "strings" "time" + "github.com/hpcloud/tail" "github.com/prometheus/client_golang/prometheus" ) @@ -41,7 +42,8 @@ var ( // Postfix Prometheus metrics exporter across scrapes. type PostfixExporter struct { showqPath string - logSrc LogSource + journal *Journal + tailer *tail.Tail logUnsupportedLines bool // Metrics that should persist after refreshes, based on logs. @@ -70,16 +72,6 @@ type PostfixExporter struct { opendkimSignatureAdded *prometheus.CounterVec } -// A LogSource is an interface to read log lines. -type LogSource interface { - // Path returns a representation of the log location. - Path() string - - // Read returns the next log line. Returns `io.EOF` at the end of - // the log. - Read(context.Context) (string, error) -} - // CollectShowqFromReader parses the output of Postfix's 'showq' command // and turns it into metrics. // @@ -428,12 +420,50 @@ func addToHistogramVec(h *prometheus.HistogramVec, value, fieldName string, labe h.WithLabelValues(labels...).Observe(float) } +// CollectLogfileFromFile tails a Postfix log file and collects entries from it. +func (e *PostfixExporter) CollectLogfileFromFile(ctx context.Context) { + gaugeVec := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "postfix", + Subsystem: "", + Name: "up", + Help: "Whether scraping Postfix's metrics was successful.", + }, + []string{"path"}) + gauge := gaugeVec.WithLabelValues(e.tailer.Filename) + for { + select { + case line := <-e.tailer.Lines: + e.CollectFromLogLine(line.Text) + case <-ctx.Done(): + gauge.Set(0) + return + } + gauge.Set(1) + } +} + // NewPostfixExporter creates a new Postfix exporter instance. -func NewPostfixExporter(showqPath string, logSrc LogSource, logUnsupportedLines bool) (*PostfixExporter, error) { +func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal, logUnsupportedLines bool) (*PostfixExporter, error) { + var tailer *tail.Tail + if logfilePath != "" { + var err error + tailer, err = tail.TailFile(logfilePath, tail.Config{ + ReOpen: true, // reopen the file if it's rotated + MustExist: true, // fail immediately if the file is missing or has incorrect permissions + Follow: true, // run in follow mode + Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file + }) + if err != nil { + return nil, err + } + } + timeBuckets := []float64{1e-3, 1e-2, 1e-1, 1.0, 10, 1 * 60, 1 * 60 * 60, 24 * 60 * 60, 2 * 24 * 60 * 60} return &PostfixExporter{ logUnsupportedLines: logUnsupportedLines, showqPath: showqPath, - logSrc: logSrc, + tailer: tailer, + journal: journal, cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "postfix", @@ -583,7 +613,7 @@ func NewPostfixExporter(showqPath string, logSrc LogSource, logUnsupportedLines func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) { ch <- postfixUpDesc - if e.logSrc == nil { + if e.tailer == nil && e.journal == nil { return } ch <- e.cleanupProcesses.Desc() @@ -611,12 +641,38 @@ func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) { e.opendkimSignatureAdded.Describe(ch) } -func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { - if e.logSrc == nil { +func (e *PostfixExporter) foreverCollectFromJournal(ctx context.Context) { + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "postfix", + Subsystem: "", + Name: "up", + Help: "Whether scraping Postfix's metrics was successful.", + }, + []string{"path"}).WithLabelValues(e.journal.Path) + select { + case <-ctx.Done(): + gauge.Set(0) return + default: + err := e.CollectLogfileFromJournal() + if err != nil { + log.Printf("Couldn't read journal: %v", err) + gauge.Set(0) + } else { + gauge.Set(1) + } + } +} + +func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { + if e.journal != nil { + e.foreverCollectFromJournal(ctx) + } else if e.tailer != nil { + e.CollectLogfileFromFile(ctx) } - gaugeVec := prometheus.NewGaugeVec( + prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "postfix", Subsystem: "", @@ -624,20 +680,7 @@ func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { Help: "Whether scraping Postfix's metrics was successful.", }, []string{"path"}) - gauge := gaugeVec.WithLabelValues(e.logSrc.Path()) - defer gauge.Set(0) - - for { - line, err := e.logSrc.Read(ctx) - if err != nil { - if err != io.EOF { - log.Printf("Couldn't read journal: %v", err) - } - return - } - e.CollectFromLogLine(line) - gauge.Set(1) - } + return } // Collect metrics from Postfix's showq socket and its log file. @@ -658,7 +701,7 @@ func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) { e.showqPath) } - if e.logSrc == nil { + if e.tailer == nil && e.journal == nil { return } ch <- e.cleanupProcesses diff --git a/postfix_exporter_test.go b/postfix_exporter_test.go index a3426a4..d0d06f2 100644 --- a/postfix_exporter_test.go +++ b/postfix_exporter_test.go @@ -1,17 +1,18 @@ package main import ( - "testing" - + "github.com/hpcloud/tail" "github.com/prometheus/client_golang/prometheus" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" + "testing" ) func TestPostfixExporter_CollectFromLogline(t *testing.T) { type fields struct { showqPath string - logSrc LogSource + journal *Journal + tailer *tail.Tail cleanupProcesses prometheus.Counter cleanupRejects prometheus.Counter cleanupNotAccepted prometheus.Counter @@ -172,7 +173,8 @@ func TestPostfixExporter_CollectFromLogline(t *testing.T) { t.Run(tt.name, func(t *testing.T) { e := &PostfixExporter{ showqPath: tt.fields.showqPath, - logSrc: tt.fields.logSrc, + journal: tt.fields.journal, + tailer: tt.fields.tailer, cleanupProcesses: tt.fields.cleanupProcesses, cleanupRejects: tt.fields.cleanupRejects, cleanupNotAccepted: tt.fields.cleanupNotAccepted, diff --git a/systemd.go b/systemd.go new file mode 100644 index 0000000..e459c8b --- /dev/null +++ b/systemd.go @@ -0,0 +1,119 @@ +// +build !nosystemd,linux + +package main + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/alecthomas/kingpin" + "github.com/coreos/go-systemd/v22/sdjournal" +) + +// Journal represents a lockable systemd journal. +type Journal struct { + *sdjournal.Journal + sync.Mutex + Path string +} + +// NewJournal returns a Journal for reading journal entries. +func NewJournal(unit, slice, path string) (j *Journal, err error) { + j = new(Journal) + if path != "" { + j.Journal, err = sdjournal.NewJournalFromDir(path) + j.Path = path + } else { + j.Journal, err = sdjournal.NewJournal() + j.Path = "journald" + } + if err != nil { + return + } + + if slice != "" { + err = j.AddMatch("_SYSTEMD_SLICE=" + slice) + if err != nil { + return + } + } else if unit != "" { + err = j.AddMatch("_SYSTEMD_UNIT=" + unit) + if err != nil { + return + } + } + + // Start at end of journal + err = j.SeekRealtimeUsec(uint64(time.Now().UnixNano() / 1000)) + if err != nil { + log.Printf("%v", err) + } + return +} + +// NextMessage reads the next message from the journal. +func (j *Journal) NextMessage() (s string, c uint64, err error) { + var e *sdjournal.JournalEntry + + // Read to next + c, err = j.Next() + if err != nil { + return + } + // Return when on the end of journal + if c == 0 { + return + } + + // Get entry + e, err = j.GetEntry() + if err != nil { + return + } + ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond)) + + // Format entry + s = fmt.Sprintf( + "%s %s %s[%s]: %s", + ts.Format(time.Stamp), + e.Fields["_HOSTNAME"], + e.Fields["SYSLOG_IDENTIFIER"], + e.Fields["_PID"], + e.Fields["MESSAGE"], + ) + + return +} + +// systemdFlags sets the flags for use with systemd +func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) { + app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(enable) + app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(unit) + app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(slice) + app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(path) +} + +// CollectLogfileFromJournal Collects entries from the systemd journal. +func (e *PostfixExporter) CollectLogfileFromJournal() error { + e.journal.Lock() + defer e.journal.Unlock() + + r := e.journal.Wait(time.Duration(1) * time.Second) + if r < 0 { + log.Print("error while waiting for journal!") + } + for { + m, c, err := e.journal.NextMessage() + if err != nil { + return err + } + if c == 0 { + break + } + e.CollectFromLogLine(m) + } + + return nil +}