Merge pull request #73 from kumina/docker

Docker
This commit is contained in:
Bart Vercoulen 2021-03-16 15:41:00 +01:00 committed by GitHub
commit b050cdff02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 781 additions and 249 deletions

View File

@ -5,7 +5,7 @@ This exporter provides histogram metrics for the size and age of messages stored
the mail queue. It extracts these metrics from Postfix by connecting to
a UNIX socket under `/var/spool`. It also counts events by parsing Postfix's
log entries, using regular expression matching. The log entries are retrieved from
the systemd journal or from a log file.
the systemd journal, the Docker logs, or from a log file.
## Options
@ -18,11 +18,25 @@ These options can be used when starting the `postfix_exporter`
| `--postfix.showq_path` | Path at which Postfix places its showq socket | `/var/spool/postfix/public/showq` |
| `--postfix.logfile_path` | Path where Postfix writes log entries | `/var/log/maillog` |
| `--log.unsupported` | Log all unsupported lines | `false` |
| `--systemd.enable` | Read from the systemd journal instead of log | `false` |
| `--docker.enable` | Read from the Docker logs instead of a file | `false` |
| `--docker.container.id` | The container to read Docker logs from | `postfix` |
| `--systemd.enable` | Read from the systemd journal instead of file | `false` |
| `--systemd.unit` | Name of the Postfix systemd unit | `postfix.service` |
| `--systemd.slice` | Name of the Postfix systemd slice. | `""` |
| `--systemd.journal_path` | Path to the systemd journal | `""` |
## Events from Docker
Postfix servers running in a [Docker](https://www.docker.com/)
container can be monitored using the `--docker.enable` flag. The
default container ID is `postfix`, but can be customized with the
`--docker.container.id` flag.
The default is to connect to the local Docker, but this can be
customized using [the `DOCKER_HOST` and
similar](https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient)
environment variables.
## Events from log file
The log file is tailed when processed. Rotating the log files while the exporter

7
go.mod
View File

@ -5,8 +5,15 @@ go 1.13
require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/coreos/go-systemd/v22 v22.0.0
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v1.13.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.4.7 // indirect
github.com/hpcloud/tail v1.0.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/prometheus/client_golang v1.4.1
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.4.0

12
go.sum
View File

@ -17,6 +17,14 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v1.13.1 h1:IkZjBSIc8hBjLpqeAbeE5mca5mNgeatLHBy3GO78BWo=
github.com/docker/docker v1.13.1/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
@ -53,7 +61,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -83,6 +94,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

64
logsource.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"context"
"fmt"
"io"
"github.com/alecthomas/kingpin"
)
// A LogSourceFactory provides a repository of log sources that can be
// instantiated from command line flags.
type LogSourceFactory interface {
// Init adds the factory's struct fields as flags in the
// application.
Init(*kingpin.Application)
// New attempts to create a new log source. This is called after
// flags have been parsed. Returning `nil, nil`, means the user
// didn't want this log source.
New(context.Context) (LogSourceCloser, error)
}
type LogSourceCloser interface {
io.Closer
LogSource
}
var logSourceFactories []LogSourceFactory
// RegisterLogSourceFactory can be called from module `init` functions
// to register factories.
func RegisterLogSourceFactory(lsf LogSourceFactory) {
logSourceFactories = append(logSourceFactories, lsf)
}
// InitLogSourceFactories runs Init on all factories. The
// initialization order is arbitrary, except `fileLogSourceFactory` is
// always last (the fallback). The file log source must be last since
// it's enabled by default.
func InitLogSourceFactories(app *kingpin.Application) {
RegisterLogSourceFactory(&fileLogSourceFactory{})
for _, f := range logSourceFactories {
f.Init(app)
}
}
// NewLogSourceFromFactories iterates through the factories and
// attempts to instantiate a log source. The first factory to return
// success wins.
func NewLogSourceFromFactories(ctx context.Context) (LogSourceCloser, error) {
for _, f := range logSourceFactories {
src, err := f.New(ctx)
if err != nil {
return nil, err
}
if src != nil {
return src, nil
}
}
return nil, fmt.Errorf("no log source configured")
}

96
logsource_docker.go Normal file
View File

@ -0,0 +1,96 @@
// +build !nodocker
package main
import (
"bufio"
"context"
"io"
"log"
"strings"
"github.com/alecthomas/kingpin"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
)
// A DockerLogSource reads log records from the given Docker
// journal.
type DockerLogSource struct {
client DockerClient
containerID string
reader *bufio.Reader
}
// A DockerClient is the client interface that client.Client
// provides. See https://pkg.go.dev/github.com/docker/docker/client
type DockerClient interface {
io.Closer
ContainerLogs(context.Context, string, types.ContainerLogsOptions) (io.ReadCloser, error)
}
// NewDockerLogSource returns a log source for reading Docker logs.
func NewDockerLogSource(ctx context.Context, c DockerClient, containerID string) (*DockerLogSource, error) {
r, err := c.ContainerLogs(ctx, containerID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Tail: "0",
})
if err != nil {
return nil, err
}
logSrc := &DockerLogSource{
client: c,
containerID: containerID,
reader: bufio.NewReader(r),
}
return logSrc, nil
}
func (s *DockerLogSource) Close() error {
return s.client.Close()
}
func (s *DockerLogSource) Path() string {
return "docker:" + s.containerID
}
func (s *DockerLogSource) Read(ctx context.Context) (string, error) {
line, err := s.reader.ReadString('\n')
if err != nil {
return "", err
}
return strings.TrimSpace(line), nil
}
// A dockerLogSourceFactory is a factory that can create
// DockerLogSources from command line flags.
type dockerLogSourceFactory struct {
enable bool
containerID string
}
func (f *dockerLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("docker.enable", "Read from Docker logs. Environment variable DOCKER_HOST can be used to change the address. See https://pkg.go.dev/github.com/docker/docker/client?tab=doc#NewEnvClient for more information.").Default("false").BoolVar(&f.enable)
app.Flag("docker.container.id", "ID/name of the Postfix Docker container.").Default("postfix").StringVar(&f.containerID)
}
func (f *dockerLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) {
if !f.enable {
return nil, nil
}
log.Println("Reading log events from Docker")
c, err := client.NewEnvClient()
if err != nil {
return nil, err
}
return NewDockerLogSource(ctx, c, f.containerID)
}
func init() {
RegisterLogSourceFactory(&dockerLogSourceFactory{})
}

79
logsource_docker_test.go Normal file
View File

@ -0,0 +1,79 @@
// +build !nodocker
package main
import (
"context"
"io"
"io/ioutil"
"strings"
"testing"
"github.com/docker/docker/api/types"
"github.com/stretchr/testify/assert"
)
func TestNewDockerLogSource(t *testing.T) {
ctx := context.Background()
c := &fakeDockerClient{}
src, err := NewDockerLogSource(ctx, c, "acontainer")
if err != nil {
t.Fatalf("NewDockerLogSource failed: %v", err)
}
assert.Equal(t, []string{"acontainer"}, c.containerLogsCalls, "A call to ContainerLogs should be made.")
if err := src.Close(); err != nil {
t.Fatalf("Close failed: %v", err)
}
assert.Equal(t, 1, c.closeCalls, "A call to Close should be made.")
}
func TestDockerLogSource_Path(t *testing.T) {
ctx := context.Background()
c := &fakeDockerClient{}
src, err := NewDockerLogSource(ctx, c, "acontainer")
if err != nil {
t.Fatalf("NewDockerLogSource failed: %v", err)
}
defer src.Close()
assert.Equal(t, "docker:acontainer", src.Path(), "Path should be set by New.")
}
func TestDockerLogSource_Read(t *testing.T) {
ctx := context.Background()
c := &fakeDockerClient{
logsReader: ioutil.NopCloser(strings.NewReader("Feb 13 23:31:30 ahost anid[123]: aline\n")),
}
src, err := NewDockerLogSource(ctx, c, "acontainer")
if err != nil {
t.Fatalf("NewDockerLogSource failed: %v", err)
}
defer src.Close()
s, err := src.Read(ctx)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}
type fakeDockerClient struct {
logsReader io.ReadCloser
containerLogsCalls []string
closeCalls int
}
func (c *fakeDockerClient) ContainerLogs(ctx context.Context, containerID string, opts types.ContainerLogsOptions) (io.ReadCloser, error) {
c.containerLogsCalls = append(c.containerLogsCalls, containerID)
return c.logsReader, nil
}
func (c *fakeDockerClient) Close() error {
c.closeCalls++
return nil
}

78
logsource_file.go Normal file
View File

@ -0,0 +1,78 @@
package main
import (
"context"
"io"
"log"
"github.com/alecthomas/kingpin"
"github.com/hpcloud/tail"
)
// A FileLogSource can read lines from a file.
type FileLogSource struct {
tailer *tail.Tail
}
// NewFileLogSource creates a new log source, tailing the given file.
func NewFileLogSource(path string) (*FileLogSource, error) {
tailer, err := tail.TailFile(path, tail.Config{
ReOpen: true, // reopen the file if it's rotated
MustExist: true, // fail immediately if the file is missing or has incorrect permissions
Follow: true, // run in follow mode
Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file
Logger: tail.DiscardingLogger,
})
if err != nil {
return nil, err
}
return &FileLogSource{tailer}, nil
}
func (s *FileLogSource) Close() error {
defer s.tailer.Cleanup()
go func() {
// Stop() waits for the tailer goroutine to shut down, but it
// can be blocking on sending on the Lines channel...
for range s.tailer.Lines {
}
}()
return s.tailer.Stop()
}
func (s *FileLogSource) Path() string {
return s.tailer.Filename
}
func (s *FileLogSource) Read(ctx context.Context) (string, error) {
select {
case line, ok := <-s.tailer.Lines:
if !ok {
return "", io.EOF
}
return line.Text, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// A fileLogSourceFactory is a factory than can create log sources
// from command line flags.
//
// Because this factory is enabled by default, it must always be
// registered last.
type fileLogSourceFactory struct {
path string
}
func (f *fileLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").StringVar(&f.path)
}
func (f *fileLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) {
if f.path == "" {
return nil, nil
}
log.Printf("Reading log events from %s", f.path)
return NewFileLogSource(f.path)
}

87
logsource_file_test.go Normal file
View File

@ -0,0 +1,87 @@
package main
import (
"context"
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestFileLogSource_Path(t *testing.T) {
path, close, err := setupFakeLogFile()
if err != nil {
t.Fatalf("setupFakeTailer failed: %v", err)
}
defer close()
src, err := NewFileLogSource(path)
if err != nil {
t.Fatalf("NewFileLogSource failed: %v", err)
}
defer src.Close()
assert.Equal(t, path, src.Path(), "Path should be set by New.")
}
func TestFileLogSource_Read(t *testing.T) {
ctx := context.Background()
path, close, err := setupFakeLogFile()
if err != nil {
t.Fatalf("setupFakeTailer failed: %v", err)
}
defer close()
src, err := NewFileLogSource(path)
if err != nil {
t.Fatalf("NewFileLogSource failed: %v", err)
}
defer src.Close()
s, err := src.Read(ctx)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}
func setupFakeLogFile() (string, func(), error) {
f, err := ioutil.TempFile("", "filelogsource")
if err != nil {
return "", nil, err
}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer os.Remove(f.Name())
defer f.Close()
for {
// The tailer seeks to the end and then does a
// follow. Keep writing lines so we know it wakes up and
// returns lines.
fmt.Fprintln(f, "Feb 13 23:31:30 ahost anid[123]: aline")
select {
case <-time.After(10 * time.Millisecond):
// continue
case <-ctx.Done():
return
}
}
}()
return f.Name(), func() {
cancel()
wg.Wait()
}, nil
}

143
logsource_systemd.go Normal file
View File

@ -0,0 +1,143 @@
// +build !nosystemd,linux
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"github.com/alecthomas/kingpin"
"github.com/coreos/go-systemd/v22/sdjournal"
)
// timeNow is a test fake injection point.
var timeNow = time.Now
// A SystemdLogSource reads log records from the given Systemd
// journal.
type SystemdLogSource struct {
journal SystemdJournal
path string
}
// A SystemdJournal is the journal interface that sdjournal.Journal
// provides. See https://pkg.go.dev/github.com/coreos/go-systemd/sdjournal?tab=doc
type SystemdJournal interface {
io.Closer
AddMatch(match string) error
GetEntry() (*sdjournal.JournalEntry, error)
Next() (uint64, error)
SeekRealtimeUsec(usec uint64) error
Wait(timeout time.Duration) int
}
// NewSystemdLogSource returns a log source for reading Systemd
// journal entries. `unit` and `slice` provide filtering if non-empty
// (with `slice` taking precedence).
func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLogSource, error) {
logSrc := &SystemdLogSource{journal: j, path: path}
var err error
if slice != "" {
err = logSrc.journal.AddMatch("_SYSTEMD_SLICE=" + slice)
} else if unit != "" {
err = logSrc.journal.AddMatch("_SYSTEMD_UNIT=" + unit)
}
if err != nil {
logSrc.journal.Close()
return nil, err
}
// Start at end of journal
if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil {
logSrc.journal.Close()
return nil, err
}
if r := logSrc.journal.Wait(1 * time.Second); r < 0 {
logSrc.journal.Close()
return nil, err
}
return logSrc, nil
}
func (s *SystemdLogSource) Close() error {
return s.journal.Close()
}
func (s *SystemdLogSource) Path() string {
return s.path
}
func (s *SystemdLogSource) Read(ctx context.Context) (string, error) {
c, err := s.journal.Next()
if err != nil {
return "", err
}
if c == 0 {
return "", io.EOF
}
e, err := s.journal.GetEntry()
if err != nil {
return "", err
}
ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond))
return fmt.Sprintf(
"%s %s %s[%s]: %s",
ts.Format(time.Stamp),
e.Fields["_HOSTNAME"],
e.Fields["SYSLOG_IDENTIFIER"],
e.Fields["_PID"],
e.Fields["MESSAGE"],
), nil
}
// A systemdLogSourceFactory is a factory that can create
// SystemdLogSources from command line flags.
type systemdLogSourceFactory struct {
enable bool
unit, slice, path string
}
func (f *systemdLogSourceFactory) Init(app *kingpin.Application) {
app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(&f.enable)
app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(&f.unit)
app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(&f.slice)
app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(&f.path)
}
func (f *systemdLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) {
if !f.enable {
return nil, nil
}
log.Println("Reading log events from systemd")
j, path, err := newSystemdJournal(f.path)
if err != nil {
return nil, err
}
return NewSystemdLogSource(j, path, f.unit, f.slice)
}
// newSystemdJournal creates a journal handle. It returns the handle
// and a string representation of it. If `path` is empty, it connects
// to the local journald.
func newSystemdJournal(path string) (*sdjournal.Journal, string, error) {
if path != "" {
j, err := sdjournal.NewJournalFromDir(path)
return j, path, err
}
j, err := sdjournal.NewJournal()
return j, "journald", err
}
func init() {
RegisterLogSourceFactory(&systemdLogSourceFactory{})
}

150
logsource_systemd_test.go Normal file
View File

@ -0,0 +1,150 @@
// +build !nosystemd,linux
package main
import (
"context"
"io"
"os"
"testing"
"time"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/stretchr/testify/assert"
)
func TestNewSystemdLogSource(t *testing.T) {
j := &fakeSystemdJournal{}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
assert.Equal(t, []string{"_SYSTEMD_SLICE=aslice"}, j.addMatchCalls, "A match should be added for slice.")
assert.Equal(t, []uint64{1234567890000000}, j.seekRealtimeUsecCalls, "A call to SeekRealtimeUsec should be made.")
assert.Equal(t, []time.Duration{1 * time.Second}, j.waitCalls, "A call to Wait should be made.")
if err := src.Close(); err != nil {
t.Fatalf("Close failed: %v", err)
}
assert.Equal(t, 1, j.closeCalls, "A call to Close should be made.")
}
func TestSystemdLogSource_Path(t *testing.T) {
j := &fakeSystemdJournal{}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
defer src.Close()
assert.Equal(t, "apath", src.Path(), "Path should be set by New.")
}
func TestSystemdLogSource_Read(t *testing.T) {
ctx := context.Background()
j := &fakeSystemdJournal{
getEntryValues: []sdjournal.JournalEntry{
{
Fields: map[string]string{
"_HOSTNAME": "ahost",
"SYSLOG_IDENTIFIER": "anid",
"_PID": "123",
"MESSAGE": "aline",
},
RealtimeTimestamp: 1234567890000000,
},
},
nextValues: []uint64{1},
}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
defer src.Close()
s, err := src.Read(ctx)
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}
func TestSystemdLogSource_ReadEOF(t *testing.T) {
ctx := context.Background()
j := &fakeSystemdJournal{
nextValues: []uint64{0},
}
src, err := NewSystemdLogSource(j, "apath", "aunit", "aslice")
if err != nil {
t.Fatalf("NewSystemdLogSource failed: %v", err)
}
defer src.Close()
_, err = src.Read(ctx)
assert.Equal(t, io.EOF, err, "Should interpret Next 0 as EOF.")
}
func TestMain(m *testing.M) {
// We compare Unix timestamps to date strings, so make it deterministic.
os.Setenv("TZ", "UTC")
timeNow = func() time.Time { return time.Date(2009, 2, 13, 23, 31, 30, 0, time.UTC) }
defer func() {
timeNow = time.Now
}()
os.Exit(m.Run())
}
type fakeSystemdJournal struct {
getEntryValues []sdjournal.JournalEntry
getEntryError error
nextValues []uint64
nextError error
addMatchCalls []string
closeCalls int
seekRealtimeUsecCalls []uint64
waitCalls []time.Duration
}
func (j *fakeSystemdJournal) AddMatch(match string) error {
j.addMatchCalls = append(j.addMatchCalls, match)
return nil
}
func (j *fakeSystemdJournal) Close() error {
j.closeCalls++
return nil
}
func (j *fakeSystemdJournal) GetEntry() (*sdjournal.JournalEntry, error) {
if len(j.getEntryValues) == 0 {
return nil, j.getEntryError
}
e := j.getEntryValues[0]
j.getEntryValues = j.getEntryValues[1:]
return &e, nil
}
func (j *fakeSystemdJournal) Next() (uint64, error) {
if len(j.nextValues) == 0 {
return 0, j.nextError
}
v := j.nextValues[0]
j.nextValues = j.nextValues[1:]
return v, nil
}
func (j *fakeSystemdJournal) SeekRealtimeUsec(usec uint64) error {
j.seekRealtimeUsecCalls = append(j.seekRealtimeUsecCalls, usec)
return nil
}
func (j *fakeSystemdJournal) Wait(timeout time.Duration) int {
j.waitCalls = append(j.waitCalls, timeout)
return 0
}

24
main.go
View File

@ -13,36 +13,26 @@ import (
func main() {
var (
ctx = context.Background()
app = kingpin.New("postfix_exporter", "Prometheus metrics exporter for postfix")
listenAddress = app.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9154").String()
metricsPath = app.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
postfixShowqPath = app.Flag("postfix.showq_path", "Path at which Postfix places its showq socket.").Default("/var/spool/postfix/public/showq").String()
postfixLogfilePath = app.Flag("postfix.logfile_path", "Path where Postfix writes log entries.").Default("/var/log/maillog").String()
logUnsupportedLines = app.Flag("log.unsupported", "Log all unsupported lines.").Bool()
systemdEnable bool
systemdUnit, systemdSlice, systemdJournalPath string
)
systemdFlags(&systemdEnable, &systemdUnit, &systemdSlice, &systemdJournalPath, app)
InitLogSourceFactories(app)
kingpin.MustParse(app.Parse(os.Args[1:]))
var journal *Journal
if systemdEnable {
var err error
journal, err = NewJournal(systemdUnit, systemdSlice, systemdJournalPath)
logSrc, err := NewLogSourceFromFactories(ctx)
if err != nil {
log.Fatalf("Error opening systemd journal: %s", err)
}
defer journal.Close()
log.Println("Reading log events from systemd")
} else {
log.Printf("Reading log events from %v", *postfixLogfilePath)
log.Fatalf("Error opening log source: %s", err)
}
defer logSrc.Close()
exporter, err := NewPostfixExporter(
*postfixShowqPath,
*postfixLogfilePath,
journal,
logSrc,
*logUnsupportedLines,
)
if err != nil {
@ -64,7 +54,7 @@ func main() {
panic(err)
}
})
ctx, cancelFunc := context.WithCancel(context.Background())
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
go exporter.StartMetricCollection(ctx)
log.Print("Listening on ", *listenAddress)

View File

@ -1,25 +0,0 @@
// +build nosystemd !linux
// This file contains stubs to support non-systemd use
package main
import (
"io"
"github.com/alecthomas/kingpin"
)
type Journal struct {
io.Closer
Path string
}
func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) {}
func NewJournal(unit, slice, path string) (*Journal, error) {
return nil, nil
}
func (e *PostfixExporter) CollectLogfileFromJournal() error {
return nil
}

View File

@ -27,7 +27,6 @@ import (
"strings"
"time"
"github.com/hpcloud/tail"
"github.com/prometheus/client_golang/prometheus"
)
@ -42,8 +41,7 @@ var (
// Postfix Prometheus metrics exporter across scrapes.
type PostfixExporter struct {
showqPath string
journal *Journal
tailer *tail.Tail
logSrc LogSource
logUnsupportedLines bool
// Metrics that should persist after refreshes, based on logs.
@ -72,6 +70,16 @@ type PostfixExporter struct {
opendkimSignatureAdded *prometheus.CounterVec
}
// A LogSource is an interface to read log lines.
type LogSource interface {
// Path returns a representation of the log location.
Path() string
// Read returns the next log line. Returns `io.EOF` at the end of
// the log.
Read(context.Context) (string, error)
}
// CollectShowqFromReader parses the output of Postfix's 'showq' command
// and turns it into metrics.
//
@ -420,50 +428,13 @@ func addToHistogramVec(h *prometheus.HistogramVec, value, fieldName string, labe
h.WithLabelValues(labels...).Observe(float)
}
// CollectLogfileFromFile tails a Postfix log file and collects entries from it.
func (e *PostfixExporter) CollectLogfileFromFile(ctx context.Context) {
gaugeVec := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
Name: "up",
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"})
gauge := gaugeVec.WithLabelValues(e.tailer.Filename)
for {
select {
case line := <-e.tailer.Lines:
e.CollectFromLogLine(line.Text)
case <-ctx.Done():
gauge.Set(0)
return
}
gauge.Set(1)
}
}
// NewPostfixExporter creates a new Postfix exporter instance.
func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal, logUnsupportedLines bool) (*PostfixExporter, error) {
var tailer *tail.Tail
if logfilePath != "" {
var err error
tailer, err = tail.TailFile(logfilePath, tail.Config{
ReOpen: true, // reopen the file if it's rotated
MustExist: true, // fail immediately if the file is missing or has incorrect permissions
Follow: true, // run in follow mode
Location: &tail.SeekInfo{Whence: io.SeekEnd}, // seek to end of file
})
if err != nil {
return nil, err
}
}
func NewPostfixExporter(showqPath string, logSrc LogSource, logUnsupportedLines bool) (*PostfixExporter, error) {
timeBuckets := []float64{1e-3, 1e-2, 1e-1, 1.0, 10, 1 * 60, 1 * 60 * 60, 24 * 60 * 60, 2 * 24 * 60 * 60}
return &PostfixExporter{
logUnsupportedLines: logUnsupportedLines,
showqPath: showqPath,
tailer: tailer,
journal: journal,
logSrc: logSrc,
cleanupProcesses: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "postfix",
@ -613,7 +584,7 @@ func NewPostfixExporter(showqPath string, logfilePath string, journal *Journal,
func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) {
ch <- postfixUpDesc
if e.tailer == nil && e.journal == nil {
if e.logSrc == nil {
return
}
ch <- e.cleanupProcesses.Desc()
@ -641,38 +612,12 @@ func (e *PostfixExporter) Describe(ch chan<- *prometheus.Desc) {
e.opendkimSignatureAdded.Describe(ch)
}
func (e *PostfixExporter) foreverCollectFromJournal(ctx context.Context) {
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
Name: "up",
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"}).WithLabelValues(e.journal.Path)
select {
case <-ctx.Done():
gauge.Set(0)
return
default:
err := e.CollectLogfileFromJournal()
if err != nil {
log.Printf("Couldn't read journal: %v", err)
gauge.Set(0)
} else {
gauge.Set(1)
}
}
}
func (e *PostfixExporter) StartMetricCollection(ctx context.Context) {
if e.journal != nil {
e.foreverCollectFromJournal(ctx)
} else if e.tailer != nil {
e.CollectLogfileFromFile(ctx)
if e.logSrc == nil {
return
}
prometheus.NewGaugeVec(
gaugeVec := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "postfix",
Subsystem: "",
@ -680,7 +625,20 @@ func (e *PostfixExporter) StartMetricCollection(ctx context.Context) {
Help: "Whether scraping Postfix's metrics was successful.",
},
[]string{"path"})
gauge := gaugeVec.WithLabelValues(e.logSrc.Path())
defer gauge.Set(0)
for {
line, err := e.logSrc.Read(ctx)
if err != nil {
if err != io.EOF {
log.Printf("Couldn't read journal: %v", err)
}
return
}
e.CollectFromLogLine(line)
gauge.Set(1)
}
}
// Collect metrics from Postfix's showq socket and its log file.
@ -701,7 +659,7 @@ func (e *PostfixExporter) Collect(ch chan<- prometheus.Metric) {
e.showqPath)
}
if e.tailer == nil && e.journal == nil {
if e.logSrc == nil {
return
}
ch <- e.cleanupProcesses

View File

@ -1,18 +1,17 @@
package main
import (
"github.com/hpcloud/tail"
"testing"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"testing"
)
func TestPostfixExporter_CollectFromLogline(t *testing.T) {
type fields struct {
showqPath string
journal *Journal
tailer *tail.Tail
logSrc LogSource
cleanupProcesses prometheus.Counter
cleanupRejects prometheus.Counter
cleanupNotAccepted prometheus.Counter
@ -173,8 +172,7 @@ func TestPostfixExporter_CollectFromLogline(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
e := &PostfixExporter{
showqPath: tt.fields.showqPath,
journal: tt.fields.journal,
tailer: tt.fields.tailer,
logSrc: tt.fields.logSrc,
cleanupProcesses: tt.fields.cleanupProcesses,
cleanupRejects: tt.fields.cleanupRejects,
cleanupNotAccepted: tt.fields.cleanupNotAccepted,

View File

@ -1,119 +0,0 @@
// +build !nosystemd,linux
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/alecthomas/kingpin"
"github.com/coreos/go-systemd/v22/sdjournal"
)
// Journal represents a lockable systemd journal.
type Journal struct {
*sdjournal.Journal
sync.Mutex
Path string
}
// NewJournal returns a Journal for reading journal entries.
func NewJournal(unit, slice, path string) (j *Journal, err error) {
j = new(Journal)
if path != "" {
j.Journal, err = sdjournal.NewJournalFromDir(path)
j.Path = path
} else {
j.Journal, err = sdjournal.NewJournal()
j.Path = "journald"
}
if err != nil {
return
}
if slice != "" {
err = j.AddMatch("_SYSTEMD_SLICE=" + slice)
if err != nil {
return
}
} else if unit != "" {
err = j.AddMatch("_SYSTEMD_UNIT=" + unit)
if err != nil {
return
}
}
// Start at end of journal
err = j.SeekRealtimeUsec(uint64(time.Now().UnixNano() / 1000))
if err != nil {
log.Printf("%v", err)
}
return
}
// NextMessage reads the next message from the journal.
func (j *Journal) NextMessage() (s string, c uint64, err error) {
var e *sdjournal.JournalEntry
// Read to next
c, err = j.Next()
if err != nil {
return
}
// Return when on the end of journal
if c == 0 {
return
}
// Get entry
e, err = j.GetEntry()
if err != nil {
return
}
ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond))
// Format entry
s = fmt.Sprintf(
"%s %s %s[%s]: %s",
ts.Format(time.Stamp),
e.Fields["_HOSTNAME"],
e.Fields["SYSLOG_IDENTIFIER"],
e.Fields["_PID"],
e.Fields["MESSAGE"],
)
return
}
// systemdFlags sets the flags for use with systemd
func systemdFlags(enable *bool, unit, slice, path *string, app *kingpin.Application) {
app.Flag("systemd.enable", "Read from the systemd journal instead of log").Default("false").BoolVar(enable)
app.Flag("systemd.unit", "Name of the Postfix systemd unit.").Default("postfix.service").StringVar(unit)
app.Flag("systemd.slice", "Name of the Postfix systemd slice. Overrides the systemd unit.").Default("").StringVar(slice)
app.Flag("systemd.journal_path", "Path to the systemd journal").Default("").StringVar(path)
}
// CollectLogfileFromJournal Collects entries from the systemd journal.
func (e *PostfixExporter) CollectLogfileFromJournal() error {
e.journal.Lock()
defer e.journal.Unlock()
r := e.journal.Wait(time.Duration(1) * time.Second)
if r < 0 {
log.Print("error while waiting for journal!")
}
for {
m, c, err := e.journal.NextMessage()
if err != nil {
return err
}
if c == 0 {
break
}
e.CollectFromLogLine(m)
}
return nil
}