Revert "Adds a Docker log source."

This commit is contained in:
Bart Vercoulen 2021-03-16 15:19:10 +01:00 committed by GitHub
parent 9d314608ff
commit 1edadd0857
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 250 additions and 781 deletions

View File

@ -5,7 +5,7 @@ This exporter provides histogram metrics for the size and age of messages stored
the mail queue. It extracts these metrics from Postfix by connecting to the mail queue. It extracts these metrics from Postfix by connecting to
a UNIX socket under `/var/spool`. It also counts events by parsing Postfix's a UNIX socket under `/var/spool`. It also counts events by parsing Postfix's
log entries, using regular expression matching. The log entries are retrieved from log entries, using regular expression matching. The log entries are retrieved from
the systemd journal, the Docker logs, or from a log file. the systemd journal or from a log file.
## Options ## Options
@ -18,25 +18,11 @@ These options can be used when starting the `postfix_exporter`
| `--postfix.showq_path` | Path at which Postfix places its showq socket | `/var/spool/postfix/public/showq` | | `--postfix.showq_path` | Path at which Postfix places its showq socket | `/var/spool/postfix/public/showq` |
| `--postfix.logfile_path` | Path where Postfix writes log entries | `/var/log/maillog` | | `--postfix.logfile_path` | Path where Postfix writes log entries | `/var/log/maillog` |
| `--log.unsupported` | Log all unsupported lines | `false` | | `--log.unsupported` | Log all unsupported lines | `false` |
| `--docker.enable` | Read from the Docker logs instead of a file | `false` | | `--systemd.enable` | Read from the systemd journal instead of log | `false` |
| `--docker.container.id` | The container to read Docker logs from | `postfix` |
| `--systemd.enable` | Read from the systemd journal instead of file | `false` |
| `--systemd.unit` | Name of the Postfix systemd unit | `postfix.service` | | `--systemd.unit` | Name of the Postfix systemd unit | `postfix.service` |
| `--systemd.slice` | Name of the Postfix systemd slice. | `""` | | `--systemd.slice` | Name of the Postfix systemd slice. | `""` |
| `--systemd.journal_path` | Path to the systemd journal | `""` | | `--systemd.journal_path` | Path to the systemd journal | `""` |
## Events from Docker
Postfix servers running in a [Docker](https://www.docker.com/)
container can be monitored using the `--docker.enable` flag. The
default container ID is `postfix`, but can be customized with the
`--docker.container.id` flag.
The default is to connect to the local Docker, but this can be
customized using [the `DOCKER_HOST` and
similar](https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient)
environment variables.
## Events from log file ## Events from log file
The log file is tailed when processed. Rotating the log files while the exporter The log file is tailed when processed. Rotating the log files while the exporter

7
go.mod
View File

@ -5,15 +5,8 @@ go 1.13
require ( require (
github.com/alecthomas/kingpin v2.2.6+incompatible github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/coreos/go-systemd/v22 v22.0.0 github.com/coreos/go-systemd/v22 v22.0.0
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v1.13.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/hpcloud/tail v1.0.0 github.com/hpcloud/tail v1.0.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/prometheus/client_golang v1.4.1 github.com/prometheus/client_golang v1.4.1
github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.4.0 github.com/stretchr/testify v1.4.0

12
go.sum
View File

@ -17,14 +17,6 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v1.13.1 h1:IkZjBSIc8hBjLpqeAbeE5mca5mNgeatLHBy3GO78BWo=
github.com/docker/docker v1.13.1/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
@ -61,10 +53,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -94,7 +83,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

View File

@ -1,64 +0,0 @@
package main
import (
"context"
"fmt"
"io"
"github.com/alecthomas/kingpin"
)
// A LogSourceFactory provides a repository of log sources that can be
// instantiated from command line flags.
type LogSourceFactory interface {
// Init adds the factory's struct fields as flags in the
// application.
Init(*kingpin.Application)
// New attempts to create a new log source. This is called after
// flags have been parsed. Returning `nil, nil`, means the user
// didn't want this log source.
New(context.Context) (LogSourceCloser, error)
}
type LogSourceCloser interface {
io.Closer
LogSource
}
var logSourceFactories []LogSourceFactory
// RegisterLogSourceFactory can be called from module `init` functions
// to register factories.
func RegisterLogSourceFactory(lsf LogSourceFactory) {
logSourceFactories = append(logSourceFactories, lsf)
}
// InitLogSourceFactories runs Init on all factories. The
// initialization order is arbitrary, except `fileLogSourceFactory` is
// always last (the fallback). The file log source must be last since
// it's enabled by default.
func InitLogSourceFactories(app *kingpin.Application) {
RegisterLogSourceFactory(&fileLogSourceFactory{})
for _, f := range logSourceFactories {
f.Init(app)
}
}
// NewLogSourceFromFactories iterates through the factories and
// attempts to instantiate a log source. The first factory to return
// success wins.
func NewLogSourceFromFactories(ctx context.Context) (LogSourceCloser, error) {
for _, f := range logSourceFactories {
src, err := f.New(ctx)
if err != nil {
return nil, err
}
if src != nil {
return src, nil
}
}
return nil, fmt.Errorf("no log source configured")
}

View File

@ -1,96 +0,0 @@
// +build !nodocker
package main
import (
"bufio"
"context"
"io"
"log"
"strings"
"github.com/alecthomas/kingpin"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
)
// A DockerLogSource reads log records from the given Docker
// journal.
type DockerLogSource struct {
client DockerClient
containerID string
reader *bufio.Reader
}
// A DockerClient is the client interface that client.Client
// provides. See https://pkg.go.dev/github.com/docker/docker/client
type DockerClient interface {
io.Closer
ContainerLogs(context.Context, string, types.ContainerLogsOptions) (io.ReadCloser, error)
}
// NewDockerLogSource returns a log source for reading Docker logs.
func NewDockerLogSource(ctx context.Context, c DockerClient, containerID string) (*DockerLogSource, error) {
r, err := c.ContainerLogs(ctx, containerID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Tail: "0",
})
if err != nil {
return nil, err
}
logSrc := &DockerLogSource{
client: c,
containerID: containerID,
reader: bufio.NewReader(r),
}
return logSrc, nil
}
func (s *DockerLogSource) Close() error {
return s.client.Close()
}
func (s *DockerLogSource) Path() string {
return "docker:" + s.containerID
}
func (s *DockerLogSource) Read(ctx context.Context) (string, error) {
line, err := s.reader.ReadString('\n')
if err != nil {
return "", err
}
return strings.TrimSpace(line), nil
}
// A dockerLogSourceFactory is a factory that can create
// DockerLogSources from command line flags.
type dockerLogSourceFactory struct {
enable bool
containerID string
}
func (f *dockerLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("docker.enable", "Read from Docker logs. Environment variable DOCKER_HOST can be used to change the address. See https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient for more information.").Default("false").BoolVar(&f.enable)
app.Flag("docker.container.id", "ID/name of the Postfix Docker container.").Default("postfix").StringVar(&f.containerID)
}
func (f *dockerLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) {
if !f.enable {
return nil, nil
}
log.Println("Reading log events from Docker")
c, err := client.NewEnvClient()
if err != nil {
return nil, err
}
return NewDockerLogSource(ctx, c, f.containerID)
}
func init() {
RegisterLogSourceFactory(&dockerLogSourceFactory{})
}

View File

@ -1,79 +0,0 @@
// +build !nodocker
package main
import (
"context"
"io"
"io/ioutil"
"strings"
"testing"
"github.com/docker/docker/api/types"
"github.com/stretchr/testify/assert"
)
func TestNewDockerLogSource(t *testing.T) {
ctx := context.Background()
c := &fakeDockerClient{}
src, err := NewDockerLogSource(ctx, c, "acontainer")
if err != nil {
t.Fatalf("NewDockerLogSource failed: %v", err)
}
assert.Equal(t, []string{"acontainer"}, c.containerLogsCalls, "A call to ContainerLogs should be made.")
if err := src.Close(); err != nil {
t.Fatalf("Close failed: %v", err)
}
assert.Equal(t, 1, c.closeCalls, "A call to Close should be made.")
}
func TestDockerLogSource_Path(t *testing.T) {
ctx := context.Background()
c := &fakeDockerClient{}
src, err := NewDockerLogSource(ctx, c, "acontainer")
if err != nil {
t.Fatalf("NewDockerLogSource failed: %v", err)
}
defer src.Close()
assert.Equal(t, "docker:acontainer", src.Path(), "Path should be set by New.")
}
func TestDockerLogSource_Read(t *testing.T) {
ctx := context.Background()
c := &fakeDockerClient{
logsReader: ioutil.NopCloser(strings.NewReader("Feb 13 23:31:30 ahost anid[123]: aline\n")),
}
src, err := NewDockerLogSource(ctx, c, "acontainer")
if err != nil {
t.Fatalf("NewDockerLogSource failed: %v", err)
}
defer src.Close()
s, err := src.Read(ctx)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}
type fakeDockerClient struct {
logsReader io.ReadCloser
containerLogsCalls []string
closeCalls int
}
func (c *fakeDockerClient) ContainerLogs(ctx context.Context, containerID string, opts types.ContainerLogsOptions) (io.ReadCloser, error) {
c.containerLogsCalls = append(c.containerLogsCalls, containerID)
return c.logsReader, nil
}
func (c *fakeDockerClient) Close() error {
c.closeCalls++
return nil
}

View File

@ -1,78 +0,0 @@
package main
import (
"context"
"io"
"log"
"github.com/alecthomas/kingpin"
"github.com/hpcloud/tail"
)
// A FileLogSource can read lines from a file.
type FileLogSource struct {
tailer *tail.Tail
}
// NewFileLogSource creates a new log source, tailing the given file.
func NewFileLogSource(path string) (*FileLogSource, error) {
tailer, err := tail.TailFile(path, tail.Config{
ReOpen: true, // reopen the file if it's rotated
MustExist: true, // fail immediately if the file is missing or has incorrect permissions
Follow: true, // run in follow mode
Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file
Logger: tail.DiscardingLogger,
})
if err != nil {
return nil, err
}
return &FileLogSource{tailer}, nil
}
func (s *FileLogSource) Close() error {
defer s.tailer.Cleanup()
go func() {
// Stop() waits for the tailer goroutine to shut down, but it
// can be blocking on sending on the Lines channel...
for range s.tailer.Lines {
}
}()
return s.tailer.Stop()
}
func (s *FileLogSource) Path() string {
return s.tailer.Filename
}
func (s *FileLogSource) Read(ctx context.Context) (string, error) {
select {
case line, ok := <-s.tailer.Lines:
if !ok {
return "", io.EOF
}
return line.Text, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// A fileLogSourceFactory is a factory than can create log sources
// from command line flags.
//
// Because this factory is enabled by default, it must always be
// registered last.
type fileLogSourceFactory struct {
path string
}
func (f *fileLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").StringVar(&f.path)
}
func (f *fileLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) {
if f.path == "" {
return nil, nil
}
log.Printf("Reading log events from %s", f.path)
return NewFileLogSource(f.path)
}

View File

@ -1,87 +0,0 @@
package main
import (
"context"
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestFileLogSource_Path(t *testing.T) {
path, close, err := setupFakeLogFile()
if err != nil {
t.Fatalf("setupFakeTailer failed: %v", err)
}
defer close()
src, err := NewFileLogSource(path)
if err != nil {
t.Fatalf("NewFileLogSource failed: %v", err)
}
defer src.Close()
assert.Equal(t, path, src.Path(), "Path should be set by New.")
}
func TestFileLogSource_Read(t *testing.T) {
ctx := context.Background()
path, close, err := setupFakeLogFile()
if err != nil {
t.Fatalf("setupFakeTailer failed: %v", err)
}
defer close()
src, err := NewFileLogSource(path)
if err != nil {
t.Fatalf("NewFileLogSource failed: %v", err)
}
defer src.Close()
s, err := src.Read(ctx)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}
func setupFakeLogFile() (string, func(), error) {
f, err := ioutil.TempFile("", "filelogsource")
if err != nil {
return "", nil, err
}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer os.Remove(f.Name())
defer f.Close()
for {
// The tailer seeks to the end and then does a
// follow. Keep writing lines so we know it wakes up and
// returns lines.
fmt.Fprintln(f, "Feb 13 23:31:30 ahost anid[123]: aline")
select {
case <-time.After(10 * time.Millisecond):
// continue
case <-ctx.Done():
return
}
}
}()
return f.Name(), func() {
cancel()
wg.Wait()
}, nil
}

View File

@ -1,143 +0,0 @@
// +build !nosystemd,linux
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"github.com/alecthomas/kingpin"
"github.com/coreos/go-systemd/v22/sdjournal"
)
// timeNow is a test fake injection point.
var timeNow = time.Now
// A SystemdLogSource reads log records from the given Systemd
// journal.
type SystemdLogSource struct {
journal SystemdJournal
path string
}
// A SystemdJournal is the journal interface that sdjournal.Journal
// provides. See https://pkg.go.dev/github.com/coreos/go-systemd/sdjournal?tab=doc
type SystemdJournal interface {
io.Closer
AddMatch(match string) error
GetEntry() (*sdjournal.JournalEntry, error)
Next() (uint64, error)
SeekRealtimeUsec(usec uint64) error
Wait(timeout time.Duration) int
}
// NewSystemdLogSource returns a log source for reading Systemd
// journal entries. `unit` and `slice` provide filtering if non-empty
// (with `slice` taking precedence).
func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLogSource, error) {
logSrc := &SystemdLogSource{journal: j, path: path}
var err error
if slice != "" {
err = logSrc.journal.AddMatch("_SYSTEMD_SLICE=" + slice)
} else if unit != "" {
err = logSrc.journal.AddMatch("_SYSTEMD_UNIT=" + unit)
}
if err != nil {
logSrc.journal.Close()
return nil, err
}
// Start at end of journal
if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil {
logSrc.journal.Close()
return nil, err
}
if r := logSrc.journal.Wait(1 * time.Second); r < 0 {
logSrc.journal.Close()
return nil, err
}
return logSrc, nil
}
func (s *SystemdLogSource) Close() error {
return s.journal.Close()
}
func (s *SystemdLogSource) Path() string {
return s.path
}
func (s *SystemdLogSource) Read(ctx context.Context) (string, error) {
c, err := s.journal.Next()
if err != nil {
return "", err
}
if c == 0 {
return "", io.EOF
}
e, err := s.journal.GetEntry()
if err != nil {
return "", err
}
ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond))
return fmt.Sprintf(
"%s %s %s[%s]: %s",
ts.Format(time.Stamp),
e.Fields["_HOSTNAME"],
e.Fields["SYSLOG_IDENTIFIER"],
e.Fields["_PID"],
e.Fields["MESSAGE"],
), nil
}
// A systemdLogSourceFactory is a factory that can create
// SystemdLogSources from command line flags.
type systemdLogSourceFactory struct {
enable bool
unit, slice, path string
}
func (f *systemdLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(&f.enable)
app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(&f.unit)
app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(&f.slice)
app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(&f.path)
}
func (f *systemdLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) {
if !f.enable {
return nil, nil
}
log.Println("Reading log events from systemd")
j, path, err := newSystemdJournal(f.path)
if err != nil {
return nil, err
}
return NewSystemdLogSource(j, path, f.unit, f.slice)
}
// newSystemdJournal creates a journal handle. It returns the handle
// and a string representation of it. If `path` is empty, it connects
// to the local journald.
func newSystemdJournal(path string) (*sdjournal.Journal, string, error) {
if path != "" {
j, err := sdjournal.NewJournalFromDir(path)
return j, path, err
}
j, err := sdjournal.NewJournal()
return j, "journald", err
}
func init() {
RegisterLogSourceFactory(&systemdLogSourceFactory{})
}

View File

@ -1,150 +0,0 @@
// +build !nosystemd,linux
package main
import (
"context"
"io"
"os"
"testing"
"time"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/stretchr/testify/assert"
)
func TestNewSystemdLogSource(t *testing.T) {
j := &fakeSystemdJournal{}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
assert.Equal(t, []string{"_SYSTEMD_SLICE=aslice"}, j.addMatchCalls, "A match should be added for slice.")
assert.Equal(t, []uint64{1234567890000000}, j.seekRealtimeUsecCalls, "A call to SeekRealtimeUsec should be made.")
assert.Equal(t, []time.Duration{1 * time.Second}, j.waitCalls, "A call to Wait should be made.")
if err := src.Close(); err != nil {
t.Fatalf("Close failed: %v", err)
}
assert.Equal(t, 1, j.closeCalls, "A call to Close should be made.")
}
func TestSystemdLogSource_Path(t *testing.T) {
j := &fakeSystemdJournal{}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
defer src.Close()
assert.Equal(t, "apath", src.Path(), "Path should be set by New.")
}
func TestSystemdLogSource_Read(t *testing.T) {
ctx := context.Background()
j := &fakeSystemdJournal{
getEntryValues: []sdjournal.JournalEntry{
{
Fields: map[string]string{
"_HOSTNAME": "ahost",
"SYSLOG_IDENTIFIER": "anid",
"_PID": "123",
"MESSAGE": "aline",
},
RealtimeTimestamp: 1234567890000000,
},
},
nextValues: []uint64{1},
}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
defer src.Close()
s, err := src.Read(ctx)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}
func TestSystemdLogSource_ReadEOF(t *testing.T) {
ctx := context.Background()
j := &fakeSystemdJournal{
nextValues: []uint64{0},
}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
defer src.Close()
_, err = src.Read(ctx)
assert.Equal(t, io.EOF, err, "Should interpret Next 0 as EOF.")
}
func TestMain(m *testing.M) {
// We compare Unix timestamps to date strings, so make it deterministic.
os.Setenv("TZ", "UTC")
timeNow = func() time.Time { return time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) }
defer func() {
timeNow = time.Now
}()
os.Exit(m.Run())
}
type fakeSystemdJournal struct {
getEntryValues []sdjournal.JournalEntry
getEntryError error
nextValues []uint64
nextError error
addMatchCalls []string
closeCalls int
seekRealtimeUsecCalls []uint64
waitCalls []time.Duration
}
func (j *fakeSystemdJournal) AddMatch(match string) error {
j.addMatchCalls = append(j.addMatchCalls, match)
return nil
}
func (j *fakeSystemdJournal) Close() error {
j.closeCalls++
return nil
}
func (j *fakeSystemdJournal) GetEntry() (*sdjournal.JournalEntry, error) {
if len(j.getEntryValues) == 0 {
return nil, j.getEntryError
}
e := j.getEntryValues[0]
j.getEntryValues = j.getEntryValues[1:]
return &e, nil
}
func (j *fakeSystemdJournal) Next() (uint64, error) {
if len(j.nextValues) == 0 {
return 0, j.nextError
}
v := j.nextValues[0]
j.nextValues = j.nextValues[1:]
return v, nil
}
func (j *fakeSystemdJournal) SeekRealtimeUsec(usec uint64) error {
j.seekRealtimeUsecCalls = append(j.seekRealtimeUsecCalls, usec)
return nil
}
func (j *fakeSystemdJournal) Wait(timeout time.Duration) int {
j.waitCalls = append(j.waitCalls, timeout)
return 0
}

36
main.go
View File

@ -13,26 +13,36 @@ import (
func main() { func main() {
var ( var (
ctx = context.Background() app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix")
app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix") listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String()
listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String() metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String()
postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String() postfixLogfilePath = app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").String()
logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool() logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool()
systemdEnable bool
systemdUnit, systemdSlice, systemdJournalPath string
) )
systemdFlags(&systemdEnable, &systemdUnit, &systemdSlice, &systemdJournalPath, app)
InitLogSourceFactories(app)
kingpin.MustParse(app.Parse(os.Args[1:])) kingpin.MustParse(app.Parse(os.Args[1:]))
logSrc, err := NewLogSourceFromFactories(ctx) var journal *Journal
if err != nil { if systemdEnable {
log.Fatalf("Error opening log source: %s", err) var err error
journal, err = NewJournal(systemdUnit, systemdSlice, systemdJournalPath)
if err != nil {
log.Fatalf("Error opening systemd journal: %s", err)
}
defer journal.Close()
log.Println("Reading log events from systemd")
} else {
log.Printf("Reading log events from %v", *postfixLogfilePath)
} }
defer logSrc.Close()
exporter, err := NewPostfixExporter( exporter, err := NewPostfixExporter(
*postfixShowqPath, *postfixShowqPath,
logSrc, *postfixLogfilePath,
journal,
*logUnsupportedLines, *logUnsupportedLines,
) )
if err != nil { if err != nil {
@ -54,7 +64,7 @@ func main() {
panic(err) panic(err)
} }
}) })
ctx, cancelFunc := context.WithCancel(ctx) ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc() defer cancelFunc()
go exporter.StartMetricCollection(ctx) go exporter.StartMetricCollection(ctx)
log.Print("Listening on ", *listenAddress) log.Print("Listening on ", *listenAddress)

25
nosystemd.go Normal file
View File

@ -0,0 +1,25 @@
// +build nosystemd !linux
// This file contains stubs to support non-systemd use
package main
import (
"io"
"github.com/alecthomas/kingpin"
)
type Journal struct {
io.Closer
Path string
}
func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) {}
func NewJournal(unit, slice, path string) (*Journal, error) {
return nil, nil
}
func (e *PostfixExporter) CollectLogfileFromJournal() error {
return nil
}

View File

@ -27,6 +27,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/hpcloud/tail"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -41,7 +42,8 @@ var (
// Postfix Prometheus metrics exporter across scrapes. // Postfix Prometheus metrics exporter across scrapes.
type PostfixExporter struct { type PostfixExporter struct {
showqPath string showqPath string
logSrc LogSource journal *Journal
tailer *tail.Tail
logUnsupportedLines bool logUnsupportedLines bool
// Metrics that should persist after refreshes, based on logs. // Metrics that should persist after refreshes, based on logs.
@ -70,16 +72,6 @@ type PostfixExporter struct {
opendkimSignatureAdded *prometheus.CounterVec opendkimSignatureAdded *prometheus.CounterVec
} }
// A LogSource is an interface to read log lines.
type LogSource interface {
// Path returns a representation of the log location.
Path() string
// Read returns the next log line. Returns `io.EOF` at the end of
// the log.
Read(context.Context) (string, error)
}
// CollectShowqFromReader parses the output of Postfix's 'showq' command // CollectShowqFromReader parses the output of Postfix's 'showq' command
// and turns it into metrics. // and turns it into metrics.
// //
@ -428,12 +420,50 @@ func addToHistogramVec(h *prometheus.HistogramVec, value, fieldName string, labe
h.WithLabelValues(labels...).Observe(float) h.WithLabelValues(labels...).Observe(float)
} }
// CollectLogfileFromFile tails a Postfix log file and collects entries from it.
func (e *PostfixExporter) CollectLogfileFromFile(ctx context.Context) {
gaugeVec := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
Name: "up",
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"})
gauge := gaugeVec.WithLabelValues(e.tailer.Filename)
for {
select {
case line := <-e.tailer.Lines:
e.CollectFromLogLine(line.Text)
case <-ctx.Done():
gauge.Set(0)
return
}
gauge.Set(1)
}
}
// NewPostfixExporter creates a new Postfix exporter instance. // NewPostfixExporter creates a new Postfix exporter instance.
func NewPostfixExporter(showqPath string, logSrc LogSource, logUnsupportedLines bool) (*PostfixExporter, error) { func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal, logUnsupportedLines bool) (*PostfixExporter, error) {
var tailer *tail.Tail
if logfilePath != "" {
var err error
tailer, err = tail.TailFile(logfilePath, tail.Config{
ReOpen: true, // reopen the file if it's rotated
MustExist: true, // fail immediately if the file is missing or has incorrect permissions
Follow: true, // run in follow mode
Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file
})
if err != nil {
return nil, err
}
}
timeBuckets := []float64{1e-3, 1e-2, 1e-1, 1.0, 10, 1 * 60, 1 * 60 * 60, 24 * 60 * 60, 2 * 24 * 60 * 60}
return &PostfixExporter{ return &PostfixExporter{
logUnsupportedLines: logUnsupportedLines, logUnsupportedLines: logUnsupportedLines,
showqPath: showqPath, showqPath: showqPath,
logSrc: logSrc, tailer: tailer,
journal: journal,
cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{ cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "postfix", Namespace: "postfix",
@ -583,7 +613,7 @@ func NewPostfixExporter(showqPath string, logSrc LogSource, logUnsupportedLines
func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) { func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) {
ch <- postfixUpDesc ch <- postfixUpDesc
if e.logSrc == nil { if e.tailer == nil && e.journal == nil {
return return
} }
ch <- e.cleanupProcesses.Desc() ch <- e.cleanupProcesses.Desc()
@ -611,12 +641,38 @@ func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) {
e.opendkimSignatureAdded.Describe(ch) e.opendkimSignatureAdded.Describe(ch)
} }
func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { func (e *PostfixExporter) foreverCollectFromJournal(ctx context.Context) {
if e.logSrc == nil { gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
Name: "up",
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"}).WithLabelValues(e.journal.Path)
select {
case <-ctx.Done():
gauge.Set(0)
return return
default:
err := e.CollectLogfileFromJournal()
if err != nil {
log.Printf("Couldn't read journal: %v", err)
gauge.Set(0)
} else {
gauge.Set(1)
}
}
}
func (e *PostfixExporter) StartMetricCollection(ctx context.Context) {
if e.journal != nil {
e.foreverCollectFromJournal(ctx)
} else if e.tailer != nil {
e.CollectLogfileFromFile(ctx)
} }
gaugeVec := prometheus.NewGaugeVec( prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Namespace: "postfix", Namespace: "postfix",
Subsystem: "", Subsystem: "",
@ -624,20 +680,7 @@ func (e *PostfixExporter) StartMetricCollection(ctx context.Context) {
Help: "Whether scraping Postfix's metrics was successful.", Help: "Whether scraping Postfix's metrics was successful.",
}, },
[]string{"path"}) []string{"path"})
gauge := gaugeVec.WithLabelValues(e.logSrc.Path()) return
defer gauge.Set(0)
for {
line, err := e.logSrc.Read(ctx)
if err != nil {
if err != io.EOF {
log.Printf("Couldn't read journal: %v", err)
}
return
}
e.CollectFromLogLine(line)
gauge.Set(1)
}
} }
// Collect metrics from Postfix's showq socket and its log file. // Collect metrics from Postfix's showq socket and its log file.
@ -658,7 +701,7 @@ func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) {
e.showqPath) e.showqPath)
} }
if e.logSrc == nil { if e.tailer == nil && e.journal == nil {
return return
} }
ch <- e.cleanupProcesses ch <- e.cleanupProcesses

View File

@ -1,17 +1,18 @@
package main package main
import ( import (
"testing" "github.com/hpcloud/tail"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go" io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing"
) )
func TestPostfixExporter_CollectFromLogline(t *testing.T) { func TestPostfixExporter_CollectFromLogline(t *testing.T) {
type fields struct { type fields struct {
showqPath string showqPath string
logSrc LogSource journal *Journal
tailer *tail.Tail
cleanupProcesses prometheus.Counter cleanupProcesses prometheus.Counter
cleanupRejects prometheus.Counter cleanupRejects prometheus.Counter
cleanupNotAccepted prometheus.Counter cleanupNotAccepted prometheus.Counter
@ -172,7 +173,8 @@ func TestPostfixExporter_CollectFromLogline(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
e := &PostfixExporter{ e := &PostfixExporter{
showqPath: tt.fields.showqPath, showqPath: tt.fields.showqPath,
logSrc: tt.fields.logSrc, journal: tt.fields.journal,
tailer: tt.fields.tailer,
cleanupProcesses: tt.fields.cleanupProcesses, cleanupProcesses: tt.fields.cleanupProcesses,
cleanupRejects: tt.fields.cleanupRejects, cleanupRejects: tt.fields.cleanupRejects,
cleanupNotAccepted: tt.fields.cleanupNotAccepted, cleanupNotAccepted: tt.fields.cleanupNotAccepted,

119
systemd.go Normal file
View File

@ -0,0 +1,119 @@
// +build !nosystemd,linux
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/alecthomas/kingpin"
"github.com/coreos/go-systemd/v22/sdjournal"
)
// Journal represents a lockable systemd journal.
type Journal struct {
*sdjournal.Journal
sync.Mutex
Path string
}
// NewJournal returns a Journal for reading journal entries.
func NewJournal(unit, slice, path string) (j *Journal, err error) {
j = new(Journal)
if path != "" {
j.Journal, err = sdjournal.NewJournalFromDir(path)
j.Path = path
} else {
j.Journal, err = sdjournal.NewJournal()
j.Path = "journald"
}
if err != nil {
return
}
if slice != "" {
err = j.AddMatch("_SYSTEMD_SLICE=" + slice)
if err != nil {
return
}
} else if unit != "" {
err = j.AddMatch("_SYSTEMD_UNIT=" + unit)
if err != nil {
return
}
}
// Start at end of journal
err = j.SeekRealtimeUsec(uint64(time.Now().UnixNano() / 1000))
if err != nil {
log.Printf("%v", err)
}
return
}
// NextMessage reads the next message from the journal.
func (j *Journal) NextMessage() (s string, c uint64, err error) {
var e *sdjournal.JournalEntry
// Read to next
c, err = j.Next()
if err != nil {
return
}
// Return when on the end of journal
if c == 0 {
return
}
// Get entry
e, err = j.GetEntry()
if err != nil {
return
}
ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond))
// Format entry
s = fmt.Sprintf(
"%s %s %s[%s]: %s",
ts.Format(time.Stamp),
e.Fields["_HOSTNAME"],
e.Fields["SYSLOG_IDENTIFIER"],
e.Fields["_PID"],
e.Fields["MESSAGE"],
)
return
}
// systemdFlags sets the flags for use with systemd
func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) {
app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(enable)
app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(unit)
app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(slice)
app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(path)
}
// CollectLogfileFromJournal Collects entries from the systemd journal.
func (e *PostfixExporter) CollectLogfileFromJournal() error {
e.journal.Lock()
defer e.journal.Unlock()
r := e.journal.Wait(time.Duration(1) * time.Second)
if r < 0 {
log.Print("error while waiting for journal!")
}
for {
m, c, err := e.journal.NextMessage()
if err != nil {
return err
}
if c == 0 {
break
}
e.CollectFromLogLine(m)
}
return nil
}