created docker-prometheus compose file and edited the Prometheus yml file

This commit is contained in:
Brian Christner
2015-08-18 14:36:46 +02:00
parent 4b9b5b70e3
commit bf34f627cf
664 changed files with 150581 additions and 0 deletions

View File

@ -0,0 +1,293 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/prometheus/log"
consul "github.com/hashicorp/consul/api"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
)
const (
consulWatchTimeout = 30 * time.Second
consulRetryInterval = 15 * time.Second
// ConsuleAddressLabel is the name for the label containing a target's address.
ConsulAddressLabel = clientmodel.MetaLabelPrefix + "consul_address"
// ConsuleNodeLabel is the name for the label containing a target's node name.
ConsulNodeLabel = clientmodel.MetaLabelPrefix + "consul_node"
// ConsulTagsLabel is the name of the label containing the tags assigned to the target.
ConsulTagsLabel = clientmodel.MetaLabelPrefix + "consul_tags"
// ConsulServiceLabel is the name of the label containing the service name.
ConsulServiceLabel = clientmodel.MetaLabelPrefix + "consul_service"
// ConsulServiceAddressLabel is the name of the label containing the (optional) service address.
ConsulServiceAddressLabel = clientmodel.MetaLabelPrefix + "consul_service_address"
// ConsulServicePortLabel is the name of the label containing the service port.
ConsulServicePortLabel = clientmodel.MetaLabelPrefix + "consul_service_port"
// ConsulDCLabel is the name of the label containing the datacenter ID.
ConsulDCLabel = clientmodel.MetaLabelPrefix + "consul_dc"
)
// ConsulDiscovery retrieves target information from a Consul server
// and updates them via watches.
type ConsulDiscovery struct {
client *consul.Client
clientConf *consul.Config
clientDatacenter string
tagSeparator string
scrapedServices map[string]struct{}
mu sync.RWMutex
services map[string]*consulService
}
// consulService contains data belonging to the same service.
type consulService struct {
name string
tgroup *config.TargetGroup
lastIndex uint64
removed bool
running bool
done chan struct{}
}
// NewConsulDiscovery returns a new ConsulDiscovery for the given config.
func NewConsulDiscovery(conf *config.ConsulSDConfig) *ConsulDiscovery {
clientConf := &consul.Config{
Address: conf.Server,
Scheme: conf.Scheme,
Datacenter: conf.Datacenter,
Token: conf.Token,
HttpAuth: &consul.HttpBasicAuth{
Username: conf.Username,
Password: conf.Password,
},
}
client, err := consul.NewClient(clientConf)
if err != nil {
// NewClient always returns a nil error.
panic(fmt.Errorf("discovery.NewConsulDiscovery: %s", err))
}
cd := &ConsulDiscovery{
client: client,
clientConf: clientConf,
tagSeparator: conf.TagSeparator,
scrapedServices: map[string]struct{}{},
services: map[string]*consulService{},
}
// If the datacenter isn't set in the clientConf, let's get it from the local Consul agent
// (Consul default is to use local node's datacenter if one isn't given for a query).
if clientConf.Datacenter == "" {
info, err := client.Agent().Self()
if err != nil {
panic(fmt.Errorf("discovery.NewConsulDiscovery: %s", err))
}
cd.clientDatacenter = info["Config"]["Datacenter"].(string)
} else {
cd.clientDatacenter = clientConf.Datacenter
}
for _, name := range conf.Services {
cd.scrapedServices[name] = struct{}{}
}
return cd
}
// Sources implements the TargetProvider interface.
func (cd *ConsulDiscovery) Sources() []string {
clientConf := *cd.clientConf
clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second}
client, err := consul.NewClient(&clientConf)
if err != nil {
// NewClient always returns a nil error.
panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err))
}
srvs, _, err := client.Catalog().Services(nil)
if err != nil {
log.Errorf("Error refreshing service list: %s", err)
return nil
}
cd.mu.Lock()
defer cd.mu.Unlock()
srcs := make([]string, 0, len(srvs))
for name := range srvs {
if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) == 0 || ok {
srcs = append(srcs, name)
}
}
return srcs
}
// Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
defer cd.stop()
update := make(chan *consulService, 10)
go cd.watchServices(update, done)
for {
select {
case <-done:
return
case srv := <-update:
if srv.removed {
close(srv.done)
// Send clearing update.
ch <- &config.TargetGroup{Source: srv.name}
break
}
// Launch watcher for the service.
if !srv.running {
go cd.watchService(srv, ch)
srv.running = true
}
}
}
}
func (cd *ConsulDiscovery) stop() {
// The lock prevents Run from terminating while the watchers attempt
// to send on their channels.
cd.mu.Lock()
defer cd.mu.Unlock()
for _, srv := range cd.services {
close(srv.done)
}
}
// watchServices retrieves updates from Consul's services endpoint and sends
// potential updates to the update channel.
func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-chan struct{}) {
var lastIndex uint64
for {
catalog := cd.client.Catalog()
srvs, meta, err := catalog.Services(&consul.QueryOptions{
WaitIndex: lastIndex,
WaitTime: consulWatchTimeout,
})
if err != nil {
log.Errorf("Error refreshing service list: %s", err)
time.Sleep(consulRetryInterval)
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == lastIndex {
continue
}
lastIndex = meta.LastIndex
cd.mu.Lock()
select {
case <-done:
cd.mu.Unlock()
return
default:
// Continue.
}
// Check for new services.
for name := range srvs {
if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok {
continue
}
srv, ok := cd.services[name]
if !ok {
srv = &consulService{
name: name,
tgroup: &config.TargetGroup{},
done: make(chan struct{}),
}
srv.tgroup.Source = name
cd.services[name] = srv
}
srv.tgroup.Labels = clientmodel.LabelSet{
ConsulServiceLabel: clientmodel.LabelValue(name),
ConsulDCLabel: clientmodel.LabelValue(cd.clientDatacenter),
}
update <- srv
}
// Check for removed services.
for name, srv := range cd.services {
if _, ok := srvs[name]; !ok {
srv.removed = true
update <- srv
delete(cd.services, name)
}
}
cd.mu.Unlock()
}
}
// watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch.
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) {
catalog := cd.client.Catalog()
for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
WaitIndex: srv.lastIndex,
WaitTime: consulWatchTimeout,
})
if err != nil {
log.Errorf("Error refreshing service %s: %s", srv.name, err)
time.Sleep(consulRetryInterval)
continue
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == srv.lastIndex {
continue
}
srv.lastIndex = meta.LastIndex
srv.tgroup.Targets = make([]clientmodel.LabelSet, 0, len(nodes))
for _, node := range nodes {
addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort)
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator
srv.tgroup.Targets = append(srv.tgroup.Targets, clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(addr),
ConsulAddressLabel: clientmodel.LabelValue(node.Address),
ConsulNodeLabel: clientmodel.LabelValue(node.Node),
ConsulTagsLabel: clientmodel.LabelValue(tags),
ConsulServiceAddressLabel: clientmodel.LabelValue(node.ServiceAddress),
ConsulServicePortLabel: clientmodel.LabelValue(strconv.Itoa(node.ServicePort)),
})
}
cd.mu.Lock()
select {
case <-srv.done:
cd.mu.Unlock()
return
default:
// Continue.
}
ch <- srv.tgroup
cd.mu.Unlock()
}
}

View File

@ -0,0 +1,239 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/miekg/dns"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/log"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
)
const (
resolvConf = "/etc/resolv.conf"
DNSNameLabel = clientmodel.MetaLabelPrefix + "dns_name"
// Constants for instrumentation.
namespace = "prometheus"
interval = "interval"
)
var (
dnsSDLookupsCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "dns_sd_lookups_total",
Help: "The number of DNS-SD lookups.",
})
dnsSDLookupFailuresCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "dns_sd_lookup_failures_total",
Help: "The number of DNS-SD lookup failures.",
})
)
func init() {
prometheus.MustRegister(dnsSDLookupFailuresCount)
prometheus.MustRegister(dnsSDLookupsCount)
}
// DNSDiscovery periodically performs DNS-SD requests. It implements
// the TargetProvider interface.
type DNSDiscovery struct {
names []string
done chan struct{}
interval time.Duration
m sync.RWMutex
port int
qtype uint16
}
// NewDNSDiscovery returns a new DNSDiscovery which periodically refreshes its targets.
func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
qtype := dns.TypeSRV
switch strings.ToUpper(conf.Type) {
case "A":
qtype = dns.TypeA
case "AAAA":
qtype = dns.TypeAAAA
case "SRV":
qtype = dns.TypeSRV
}
return &DNSDiscovery{
names: conf.Names,
done: make(chan struct{}),
interval: time.Duration(conf.RefreshInterval),
qtype: qtype,
port: conf.Port,
}
}
// Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
ticker := time.NewTicker(dd.interval)
defer ticker.Stop()
// Get an initial set right away.
dd.refreshAll(ch)
for {
select {
case <-ticker.C:
dd.refreshAll(ch)
case <-done:
return
}
}
}
// Sources implements the TargetProvider interface.
func (dd *DNSDiscovery) Sources() []string {
var srcs []string
for _, name := range dd.names {
srcs = append(srcs, name)
}
return srcs
}
func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) {
var wg sync.WaitGroup
wg.Add(len(dd.names))
for _, name := range dd.names {
go func(n string) {
if err := dd.refresh(n, ch); err != nil {
log.Errorf("Error refreshing DNS targets: %s", err)
}
wg.Done()
}(name)
}
wg.Wait()
}
func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) error {
response, err := lookupAll(name, dd.qtype)
dnsSDLookupsCount.Inc()
if err != nil {
dnsSDLookupFailuresCount.Inc()
return err
}
tg := &config.TargetGroup{}
for _, record := range response.Answer {
target := clientmodel.LabelValue("")
switch addr := record.(type) {
case *dns.SRV:
// Remove the final dot from rooted DNS names to make them look more usual.
addr.Target = strings.TrimRight(addr.Target, ".")
target = clientmodel.LabelValue(fmt.Sprintf("%s:%d", addr.Target, addr.Port))
case *dns.A:
target = clientmodel.LabelValue(fmt.Sprintf("%s:%d", addr.A, dd.port))
case *dns.AAAA:
target = clientmodel.LabelValue(fmt.Sprintf("%s:%d", addr.AAAA, dd.port))
default:
log.Warnf("%q is not a valid SRV record", record)
continue
}
tg.Targets = append(tg.Targets, clientmodel.LabelSet{
clientmodel.AddressLabel: target,
DNSNameLabel: clientmodel.LabelValue(name),
})
}
tg.Source = name
ch <- tg
return nil
}
func lookupAll(name string, qtype uint16) (*dns.Msg, error) {
conf, err := dns.ClientConfigFromFile(resolvConf)
if err != nil {
return nil, fmt.Errorf("could not load resolv.conf: %s", err)
}
client := &dns.Client{}
response := &dns.Msg{}
for _, server := range conf.Servers {
servAddr := net.JoinHostPort(server, conf.Port)
for _, suffix := range conf.Search {
response, err = lookup(name, qtype, client, servAddr, suffix, false)
if err != nil {
log.Warnf("resolving %s.%s failed: %s", name, suffix, err)
continue
}
if len(response.Answer) > 0 {
return response, nil
}
}
response, err = lookup(name, qtype, client, servAddr, "", false)
if err == nil {
return response, nil
}
}
return response, fmt.Errorf("could not resolve %s: No server responded", name)
}
func lookup(name string, queryType uint16, client *dns.Client, servAddr string, suffix string, edns bool) (*dns.Msg, error) {
msg := &dns.Msg{}
lname := strings.Join([]string{name, suffix}, ".")
msg.SetQuestion(dns.Fqdn(lname), queryType)
if edns {
opt := &dns.OPT{
Hdr: dns.RR_Header{
Name: ".",
Rrtype: dns.TypeOPT,
},
}
opt.SetUDPSize(dns.DefaultMsgSize)
msg.Extra = append(msg.Extra, opt)
}
response, _, err := client.Exchange(msg, servAddr)
if err != nil {
return nil, err
}
if msg.Id != response.Id {
return nil, fmt.Errorf("DNS ID mismatch, request: %d, response: %d", msg.Id, response.Id)
}
if response.MsgHdr.Truncated {
if client.Net == "tcp" {
return nil, fmt.Errorf("got truncated message on tcp")
}
if edns { // Truncated even though EDNS is used
client.Net = "tcp"
}
return lookup(name, queryType, client, servAddr, suffix, !edns)
}
return response, nil
}

View File

@ -0,0 +1,252 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"time"
"github.com/prometheus/log"
"gopkg.in/fsnotify.v1"
"gopkg.in/yaml.v2"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
)
const FileSDFilepathLabel = clientmodel.MetaLabelPrefix + "filepath"
// FileDiscovery provides service discovery functionality based
// on files that contain target groups in JSON or YAML format. Refreshing
// happens using file watches and periodic refreshes.
type FileDiscovery struct {
paths []string
watcher *fsnotify.Watcher
interval time.Duration
// lastRefresh stores which files were found during the last refresh
// and how many target groups they contained.
// This is used to detect deleted target groups.
lastRefresh map[string]int
}
// NewFileDiscovery returns a new file discovery for the given paths.
func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery {
return &FileDiscovery{
paths: conf.Names,
interval: time.Duration(conf.RefreshInterval),
}
}
// Sources implements the TargetProvider interface.
func (fd *FileDiscovery) Sources() []string {
var srcs []string
// As we allow multiple target groups per file we have no choice
// but to parse them all.
for _, p := range fd.listFiles() {
tgroups, err := readFile(p)
if err != nil {
log.Errorf("Error reading file %q: ", p, err)
}
for _, tg := range tgroups {
srcs = append(srcs, tg.Source)
}
}
return srcs
}
// listFiles returns a list of all files that match the configured patterns.
func (fd *FileDiscovery) listFiles() []string {
var paths []string
for _, p := range fd.paths {
files, err := filepath.Glob(p)
if err != nil {
log.Errorf("Error expanding glob %q: %s", p, err)
continue
}
paths = append(paths, files...)
}
return paths
}
// watchFiles sets watches on all full paths or directories that were configured for
// this file discovery.
func (fd *FileDiscovery) watchFiles() {
if fd.watcher == nil {
panic("no watcher configured")
}
for _, p := range fd.paths {
if idx := strings.LastIndex(p, "/"); idx > -1 {
p = p[:idx]
} else {
p = "./"
}
if err := fd.watcher.Add(p); err != nil {
log.Errorf("Error adding file watch for %q: %s", p, err)
}
}
}
// Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
defer fd.stop()
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Errorf("Error creating file watcher: %s", err)
return
}
fd.watcher = watcher
fd.refresh(ch)
ticker := time.NewTicker(fd.interval)
defer ticker.Stop()
for {
// Stopping has priority over refreshing. Thus we wrap the actual select
// clause to always catch done signals.
select {
case <-done:
return
default:
select {
case <-done:
return
case event := <-fd.watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
if len(event.Name) == 0 {
break
}
// Everything but a chmod requires rereading.
if event.Op^fsnotify.Chmod == 0 {
break
}
// Changes to a file can spawn various sequences of events with
// different combinations of operations. For all practical purposes
// this is inaccurate.
// The most reliable solution is to reload everything if anything happens.
fd.refresh(ch)
case <-ticker.C:
// Setting a new watch after an update might fail. Make sure we don't lose
// those files forever.
fd.refresh(ch)
case err := <-fd.watcher.Errors:
if err != nil {
log.Errorf("Error on file watch: %s", err)
}
}
}
}
}
// refresh reads all files matching the discoveries patterns and sends the respective
// updated target groups through the channel.
func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
ref := map[string]int{}
for _, p := range fd.listFiles() {
tgroups, err := readFile(p)
if err != nil {
log.Errorf("Error reading file %q: %s", p, err)
// Prevent deletion down below.
ref[p] = fd.lastRefresh[p]
continue
}
for _, tg := range tgroups {
ch <- tg
}
ref[p] = len(tgroups)
}
// Send empty updates for sources that disappeared.
for f, n := range fd.lastRefresh {
m, ok := ref[f]
if !ok || n > m {
for i := m; i < n; i++ {
ch <- &config.TargetGroup{Source: fileSource(f, i)}
}
}
}
fd.lastRefresh = ref
fd.watchFiles()
}
// fileSource returns a source ID for the i-th target group in the file.
func fileSource(filename string, i int) string {
return fmt.Sprintf("%s:%d", filename, i)
}
// stop shuts down the file watcher.
func (fd *FileDiscovery) stop() {
log.Debugf("Stopping file discovery for %s...", fd.paths)
// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-fd.watcher.Errors:
case <-fd.watcher.Events:
// Drain all events and errors.
default:
return
}
}
}()
fd.watcher.Close()
log.Debugf("File discovery for %s stopped.", fd.paths)
}
// readFile reads a JSON or YAML list of targets groups from the file, depending on its
// file extension. It returns full configuration target groups.
func readFile(filename string) ([]*config.TargetGroup, error) {
content, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
var targetGroups []*config.TargetGroup
switch ext := filepath.Ext(filename); strings.ToLower(ext) {
case ".json":
if err := json.Unmarshal(content, &targetGroups); err != nil {
return nil, err
}
case ".yml", ".yaml":
if err := yaml.Unmarshal(content, &targetGroups); err != nil {
return nil, err
}
default:
panic(fmt.Errorf("retrieval.FileDiscovery.readFile: unhandled file extension %q", ext))
}
for i, tg := range targetGroups {
tg.Source = fileSource(filename, i)
if tg.Labels == nil {
tg.Labels = clientmodel.LabelSet{}
}
tg.Labels[FileSDFilepathLabel] = clientmodel.LabelValue(filename)
}
return targetGroups, nil
}

View File

@ -0,0 +1,109 @@
package discovery
import (
"fmt"
"io"
"os"
"testing"
"time"
"github.com/prometheus/prometheus/config"
)
func TestFileSD(t *testing.T) {
defer os.Remove("fixtures/_test.yml")
defer os.Remove("fixtures/_test.json")
testFileSD(t, ".yml")
testFileSD(t, ".json")
}
func testFileSD(t *testing.T, ext string) {
// As interval refreshing is more of a fallback, we only want to test
// whether file watches work as expected.
var conf config.FileSDConfig
conf.Names = []string{"fixtures/_*" + ext}
conf.RefreshInterval = config.Duration(1 * time.Hour)
var (
fsd = NewFileDiscovery(&conf)
ch = make(chan *config.TargetGroup)
done = make(chan struct{})
)
go fsd.Run(ch, done)
defer close(done)
select {
case <-time.After(25 * time.Millisecond):
// Expected.
case tg := <-ch:
t.Fatalf("Unexpected target group in file discovery: %s", tg)
}
newf, err := os.Create("fixtures/_test" + ext)
if err != nil {
t.Fatal(err)
}
defer newf.Close()
f, err := os.Open("fixtures/target_groups" + ext)
if err != nil {
t.Fatal(err)
}
defer f.Close()
_, err = io.Copy(newf, f)
if err != nil {
t.Fatal(err)
}
newf.Close()
// The files contain two target groups which are read and sent in order.
select {
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if _, ok := tg.Labels["foo"]; !ok {
t.Fatalf("Label not parsed")
}
if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) {
t.Fatalf("Unexpected target group", tg)
}
}
select {
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) {
t.Fatalf("Unexpected target group %s", tg)
}
}
// Based on unknown circumstances, sometimes fsnotify will trigger more events in
// some runs (which might be empty, chains of different operations etc.).
// We have to drain those (as the target manager would) to avoid deadlocking and must
// not try to make sense of it all...
go func() {
for tg := range ch {
// Below we will change the file to a bad syntax. Previously extracted target
// groups must not be deleted via sending an empty target group.
if len(tg.Targets) == 0 {
t.Errorf("Unexpected empty target group received: %s", tg)
}
}
}()
newf, err = os.Create("fixtures/_test.new")
if err != nil {
t.Fatal(err)
}
defer os.Remove(newf.Name())
if _, err := newf.Write([]byte("]gibberish\n][")); err != nil {
t.Fatal(err)
}
newf.Close()
os.Rename(newf.Name(), "fixtures/_test"+ext)
// Give notifcations some time to arrive.
time.Sleep(50 * time.Millisecond)
}

View File

@ -0,0 +1,11 @@
[
{
"targets": ["localhost:9090", "example.org:443"],
"labels": {
"foo": "bar"
}
},
{
"targets": ["my.domain"]
}
]

View File

@ -0,0 +1,5 @@
- targets: ['localhost:9090', 'example.org:443']
labels:
foo: bar
- targets: ['my.domain']

View File

@ -0,0 +1,30 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes"
)
func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.KubernetesDiscovery, error) {
kd := &kubernetes.KubernetesDiscovery{
Conf: conf,
}
err := kd.Initialize()
if err != nil {
return nil, err
}
return kd, nil
}

View File

@ -0,0 +1,577 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sync"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
"github.com/prometheus/prometheus/util/strutil"
)
const (
sourceServicePrefix = "services"
// kubernetesMetaLabelPrefix is the meta prefix used for all meta labels.
// in this discovery.
metaLabelPrefix = clientmodel.MetaLabelPrefix + "kubernetes_"
// nodeLabel is the name for the label containing a target's node name.
nodeLabel = metaLabelPrefix + "node"
// serviceNamespaceLabel is the name for the label containing a target's service namespace.
serviceNamespaceLabel = metaLabelPrefix + "service_namespace"
// serviceNameLabel is the name for the label containing a target's service name.
serviceNameLabel = metaLabelPrefix + "service_name"
// nodeLabelPrefix is the prefix for the node labels.
nodeLabelPrefix = metaLabelPrefix + "node_label_"
// serviceLabelPrefix is the prefix for the service labels.
serviceLabelPrefix = metaLabelPrefix + "service_label_"
// serviceAnnotationPrefix is the prefix for the service annotations.
serviceAnnotationPrefix = metaLabelPrefix + "service_annotation_"
// nodesTargetGroupName is the name given to the target group for nodes.
nodesTargetGroupName = "nodes"
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
apiVersion = "v1"
apiPrefix = "api/" + apiVersion
nodesURL = apiPrefix + "/nodes"
servicesURL = apiPrefix + "/services"
endpointsURL = apiPrefix + "/endpoints"
serviceEndpointsURL = apiPrefix + "/namespaces/%s/endpoints/%s"
)
type KubernetesDiscovery struct {
client *http.Client
Conf *config.KubernetesSDConfig
nodesResourceVersion string
servicesResourceVersion string
endpointsResourceVersion string
nodes map[string]*Node
services map[string]map[string]*Service
nodesMu sync.RWMutex
servicesMu sync.RWMutex
runDone chan struct{}
}
func (kd *KubernetesDiscovery) Initialize() error {
client, err := newKubernetesHTTPClient(kd.Conf)
if err != nil {
return err
}
kd.client = client
kd.nodes = map[string]*Node{}
kd.services = map[string]map[string]*Service{}
kd.runDone = make(chan struct{})
return nil
}
// Sources implements the TargetProvider interface.
func (kd *KubernetesDiscovery) Sources() []string {
res, err := kd.client.Get(kd.Conf.Server + nodesURL)
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & log & return empty.
log.Errorf("Unable to list Kubernetes nodes: %s", err)
return []string{}
}
if res.StatusCode != http.StatusOK {
log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status)
return []string{}
}
var nodes NodeList
if err := json.NewDecoder(res.Body).Decode(&nodes); err != nil {
body, _ := ioutil.ReadAll(res.Body)
log.Errorf("Unable to list Kubernetes nodes. Unexpected response body: %s", string(body))
return []string{}
}
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
sourceNames := make([]string, 0, len(nodes.Items))
kd.nodesResourceVersion = nodes.ResourceVersion
for idx, node := range nodes.Items {
sourceNames = append(sourceNames, nodesTargetGroupName+":"+node.ObjectMeta.Name)
kd.nodes[node.ObjectMeta.Name] = &nodes.Items[idx]
}
res, err = kd.client.Get(kd.Conf.Server + servicesURL)
if err != nil {
// If we can't list services then we can't watch them. Assume this is a misconfiguration
// & log & return empty.
log.Errorf("Unable to list Kubernetes services: %s", err)
return []string{}
}
if res.StatusCode != http.StatusOK {
log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status)
return []string{}
}
var services ServiceList
if err := json.NewDecoder(res.Body).Decode(&services); err != nil {
body, _ := ioutil.ReadAll(res.Body)
log.Errorf("Unable to list Kubernetes services. Unexpected response body: %s", string(body))
return []string{}
}
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
kd.servicesResourceVersion = services.ResourceVersion
for idx, service := range services.Items {
sourceNames = append(sourceNames, serviceSource(&service))
namespace, ok := kd.services[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
kd.services[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = &services.Items[idx]
}
return sourceNames
}
// Run implements the TargetProvider interface.
func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
select {
case ch <- kd.updateNodesTargetGroup():
case <-done:
return
}
for _, ns := range kd.services {
for _, service := range ns {
select {
case ch <- kd.addService(service):
case <-done:
return
}
}
}
retryInterval := time.Duration(kd.Conf.RetryInterval)
update := make(chan interface{}, 10)
go kd.watchNodes(update, done, retryInterval)
go kd.watchServices(update, done, retryInterval)
go kd.watchServiceEndpoints(update, done, retryInterval)
var tg *config.TargetGroup
for {
select {
case <-done:
return
case event := <-update:
switch obj := event.(type) {
case *nodeEvent:
kd.updateNode(obj.Node, obj.EventType)
tg = kd.updateNodesTargetGroup()
case *serviceEvent:
tg = kd.updateService(obj.Service, obj.EventType)
case *endpointsEvent:
tg = kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)
}
}
select {
case ch <- tg:
case <-done:
return
}
}
}
func (kd *KubernetesDiscovery) updateNodesTargetGroup() *config.TargetGroup {
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
tg := &config.TargetGroup{Source: nodesTargetGroupName}
// Now let's loop through the nodes & add them to the target group with appropriate labels.
for nodeName, node := range kd.nodes {
address := fmt.Sprintf("%s:%d", node.Status.Addresses[0].Address, kd.Conf.KubeletPort)
t := clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(address),
nodeLabel: clientmodel.LabelValue(nodeName),
}
for k, v := range node.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(nodeLabelPrefix + k)
t[clientmodel.LabelName(labelName)] = clientmodel.LabelValue(v)
}
tg.Targets = append(tg.Targets, t)
}
return tg
}
func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) {
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
updatedNodeName := node.ObjectMeta.Name
switch eventType {
case deleted:
// Deleted - remove from nodes map.
delete(kd.nodes, updatedNodeName)
case added, modified:
// Added/Modified - update the node in the nodes map.
kd.nodes[updatedNodeName] = node
}
}
// watchNodes watches nodes as they come & go.
func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil)
if err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", kd.nodesResourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.client.Do(req)
if err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch nodes: %s", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event nodeEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Failed to watch nodes: %s", err)
return
}
kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
// watchServices watches services as they come & go.
func (kd *KubernetesDiscovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil)
if err != nil {
log.Errorf("Failed to watch services: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", kd.servicesResourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.client.Do(req)
if err != nil {
log.Errorf("Failed to watch services: %s", err)
return
}
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch services: %s", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event serviceEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Unable to watch services: %s", err)
return
}
kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
var (
name = service.ObjectMeta.Name
namespace = service.ObjectMeta.Namespace
_, exists = kd.services[namespace][name]
)
switch eventType {
case deleted:
if exists {
return kd.deleteService(service)
}
case added, modified:
return kd.addService(service)
}
return nil
}
func (kd *KubernetesDiscovery) deleteService(service *Service) *config.TargetGroup {
tg := &config.TargetGroup{Source: serviceSource(service)}
delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
if len(kd.services[service.ObjectMeta.Namespace]) == 0 {
delete(kd.services, service.ObjectMeta.Namespace)
}
return tg
}
func (kd *KubernetesDiscovery) addService(service *Service) *config.TargetGroup {
namespace, ok := kd.services[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
kd.services[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = service
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
res, err := kd.client.Get(kd.Conf.Server + endpointURL)
if err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to get service endpoints: %s", res.StatusCode)
return nil
}
var endpoints Endpoints
if err := json.NewDecoder(res.Body).Decode(&endpoints); err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return nil
}
return kd.updateServiceTargetGroup(service, &endpoints)
}
func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints) *config.TargetGroup {
tg := &config.TargetGroup{
Source: serviceSource(service),
Labels: clientmodel.LabelSet{
serviceNamespaceLabel: clientmodel.LabelValue(service.ObjectMeta.Namespace),
serviceNameLabel: clientmodel.LabelValue(service.ObjectMeta.Name),
},
}
for k, v := range service.ObjectMeta.Labels {
labelName := strutil.SanitizeLabelName(serviceLabelPrefix + k)
tg.Labels[clientmodel.LabelName(labelName)] = clientmodel.LabelValue(v)
}
for k, v := range service.ObjectMeta.Annotations {
labelName := strutil.SanitizeLabelName(serviceAnnotationPrefix + k)
tg.Labels[clientmodel.LabelName(labelName)] = clientmodel.LabelValue(v)
}
// Now let's loop through the endpoints & add them to the target group with appropriate labels.
for _, eps := range endpoints.Subsets {
epPort := eps.Ports[0].Port
for _, addr := range eps.Addresses {
ipAddr := addr.IP
if len(ipAddr) == net.IPv6len {
ipAddr = "[" + ipAddr + "]"
}
address := fmt.Sprintf("%s:%d", ipAddr, epPort)
t := clientmodel.LabelSet{clientmodel.AddressLabel: clientmodel.LabelValue(address)}
tg.Targets = append(tg.Targets, t)
}
}
return tg
}
// watchServiceEndpoints watches service endpoints as they come & go.
func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil)
if err != nil {
log.Errorf("Failed to watch service endpoints: %s", err)
return
}
values := req.URL.Query()
values.Add("watch", "true")
values.Add("resourceVersion", kd.servicesResourceVersion)
req.URL.RawQuery = values.Encode()
res, err := kd.client.Do(req)
if err != nil {
log.Errorf("Failed to watch service endpoints: %s", err)
return
}
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch service endpoints: %s", res.StatusCode)
return
}
d := json.NewDecoder(res.Body)
for {
var event endpointsEvent
if err := d.Decode(&event); err != nil {
log.Errorf("Unable to watch service endpoints: %s", err)
return
}
kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, done)
}
func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
serviceNamespace := endpoints.ObjectMeta.Namespace
serviceName := endpoints.ObjectMeta.Name
if service, ok := kd.services[serviceNamespace][serviceName]; ok {
return kd.updateServiceTargetGroup(service, endpoints)
}
return nil
}
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {
bearerTokenFile := conf.BearerTokenFile
caFile := conf.CAFile
if conf.InCluster {
if len(bearerTokenFile) == 0 {
bearerTokenFile = serviceAccountToken
}
if len(caFile) == 0 {
// With recent versions, the CA certificate is provided as a token
// but we need to handle older versions too. In this case, don't
// set the CAFile & the configuration will have to use Insecure.
if _, err := os.Stat(serviceAccountCACert); err == nil {
caFile = serviceAccountCACert
}
}
}
tlsConfig := &tls.Config{InsecureSkipVerify: conf.Insecure}
// Load client cert if specified.
if len(conf.CertFile) > 0 && len(conf.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(conf.CertFile, conf.KeyFile)
if err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
caCertPool := x509.NewCertPool()
if len(caFile) > 0 {
// Load CA cert.
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, err
}
caCertPool.AppendCertsFromPEM(caCert)
}
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
tr := &http.Transport{
Dial: func(netw, addr string) (c net.Conn, err error) {
c, err = net.DialTimeout(netw, addr, time.Duration(conf.RequestTimeout))
return
},
}
tr.TLSClientConfig = tlsConfig
var rt http.RoundTripper
rt = tr
bearerToken, err := ioutil.ReadFile(bearerTokenFile)
if err != nil {
return nil, err
}
if len(bearerToken) > 0 {
rt = httputil.NewBearerAuthRoundTripper(string(bearerToken), rt)
}
if len(conf.Username) > 0 && len(conf.Password) > 0 {
rt = httputil.NewBasicAuthRoundTripper(conf.Username, conf.Password, rt)
}
return &http.Client{
Transport: rt,
}, nil
}
func serviceSource(service *Service) string {
return sourceServicePrefix + ":" + service.ObjectMeta.Namespace + "/" + service.ObjectMeta.Name
}
// Until loops until stop channel is closed, running f every period.
// f may not be invoked if stop channel is already closed.
func until(f func(), period time.Duration, stopCh <-chan struct{}) {
select {
case <-stopCh:
return
default:
f()
}
for {
select {
case <-stopCh:
return
case <-time.After(period):
f()
}
}
}

View File

@ -0,0 +1,210 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
type EventType string
const (
added EventType = "ADDED"
modified EventType = "MODIFIED"
deleted EventType = "DELETED"
)
type nodeEvent struct {
EventType EventType `json:"type"`
Node *Node `json:"object"`
}
type serviceEvent struct {
EventType EventType `json:"type"`
Service *Service `json:"object"`
}
type endpointsEvent struct {
EventType EventType `json:"type"`
Endpoints *Endpoints `json:"object"`
}
// From here down types are copied from
// https://github.com/GoogleCloudPlatform/kubernetes/blob/master/pkg/api/v1/types.go
// with all currently irrelevant types/fields stripped out. This removes the
// need for any kubernetes dependencies, with the drawback of having to keep
// this file up to date.
// ListMeta describes metadata that synthetic resources must have, including lists and
// various status objects.
type ListMeta struct {
// An opaque value that represents the version of this response for use with optimistic
// concurrency and change monitoring endpoints. Clients must treat these values as opaque
// and values may only be valid for a particular resource or set of resources. Only servers
// will generate resource versions.
ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"`
}
// ObjectMeta is metadata that all persisted resources must have, which includes all objects
// users must create.
type ObjectMeta struct {
// Name is unique within a namespace. Name is required when creating resources, although
// some resources may allow a client to request the generation of an appropriate name
// automatically. Name is primarily intended for creation idempotence and configuration
// definition.
Name string `json:"name,omitempty" description:"string that identifies an object. Must be unique within a namespace; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/identifiers.md#names"`
// Namespace defines the space within which name must be unique. An empty namespace is
// equivalent to the "default" namespace, but "default" is the canonical representation.
// Not all objects are required to be scoped to a namespace - the value of this field for
// those objects will be empty.
Namespace string `json:"namespace,omitempty" description:"namespace of the object; must be a DNS_LABEL; cannot be updated; see http://releases.k8s.io/HEAD/docs/user-guide/namespaces.md"`
ResourceVersion string `json:"resourceVersion,omitempty" description:"string that identifies the internal version of this object that can be used by clients to determine when objects have changed; populated by the system, read-only; value must be treated as opaque by clients and passed unmodified back to the server: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#concurrency-control-and-consistency"`
// TODO: replace map[string]string with labels.LabelSet type
Labels map[string]string `json:"labels,omitempty" description:"map of string keys and values that can be used to organize and categorize objects; may match selectors of replication controllers and services; see http://releases.k8s.io/HEAD/docs/user-guide/labels.md"`
// Annotations are unstructured key value data stored with a resource that may be set by
// external tooling. They are not queryable and should be preserved when modifying
// objects.
Annotations map[string]string `json:"annotations,omitempty" description:"map of string keys and values that can be used by external tooling to store and retrieve arbitrary metadata about objects; see http://releases.k8s.io/HEAD/docs/user-guide/annotations.md"`
}
// Protocol defines network protocols supported for things like conatiner ports.
type Protocol string
const (
// ProtocolTCP is the TCP protocol.
ProtocolTCP Protocol = "TCP"
// ProtocolUDP is the UDP protocol.
ProtocolUDP Protocol = "UDP"
)
const (
// NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces
NamespaceAll string = ""
)
// Container represents a single container that is expected to be run on the host.
type Container struct {
// Required: This must be a DNS_LABEL. Each container in a pod must
// have a unique name.
Name string `json:"name" description:"name of the container; must be a DNS_LABEL and unique within the pod; cannot be updated"`
// Optional.
Image string `json:"image,omitempty" description:"Docker image name; see http://releases.k8s.io/HEAD/docs/user-guide/images.md"`
}
// Service is a named abstraction of software service (for example, mysql) consisting of local port
// (for example 3306) that the proxy listens on, and the selector that determines which pods
// will answer requests sent through the proxy.
type Service struct {
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
}
// ServiceList holds a list of services.
type ServiceList struct {
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Service `json:"items" description:"list of services"`
}
// Endpoints is a collection of endpoints that implement the actual service. Example:
// Name: "mysvc",
// Subsets: [
// {
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
// },
// {
// Addresses: [{"ip": "10.10.3.3"}],
// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}]
// },
// ]
type Endpoints struct {
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
// The set of all endpoints is the union of all subsets.
Subsets []EndpointSubset `json:"subsets" description:"sets of addresses and ports that comprise a service"`
}
// EndpointSubset is a group of addresses with a common set of ports. The
// expanded set of endpoints is the Cartesian product of Addresses x Ports.
// For example, given:
// {
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
// }
// The resulting set of endpoints can be viewed as:
// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],
// b: [ 10.10.1.1:309, 10.10.2.2:309 ]
type EndpointSubset struct {
Addresses []EndpointAddress `json:"addresses,omitempty" description:"IP addresses which offer the related ports"`
Ports []EndpointPort `json:"ports,omitempty" description:"port numbers available on the related IP addresses"`
}
// EndpointAddress is a tuple that describes single IP address.
type EndpointAddress struct {
// The IP of this endpoint.
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip" description:"IP address of the endpoint"`
}
// EndpointPort is a tuple that describes a single port.
type EndpointPort struct {
// The port number.
Port int `json:"port" description:"port number of the endpoint"`
// The IP protocol for this port.
Protocol Protocol `json:"protocol,omitempty" description:"protocol for this port; must be UDP or TCP; TCP if unspecified"`
}
// EndpointsList is a list of endpoints.
type EndpointsList struct {
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Endpoints `json:"items" description:"list of endpoints"`
}
// NodeStatus is information about the current status of a node.
type NodeStatus struct {
// Queried from cloud provider, if available.
Addresses []NodeAddress `json:"addresses,omitempty" description:"list of addresses reachable to the node; see http://releases.k8s.io/HEAD/docs/admin/node.md#node-addresses" patchStrategy:"merge" patchMergeKey:"type"`
}
type NodeAddressType string
// These are valid address type of node.
const (
NodeHostName NodeAddressType = "Hostname"
NodeExternalIP NodeAddressType = "ExternalIP"
NodeInternalIP NodeAddressType = "InternalIP"
)
type NodeAddress struct {
Type NodeAddressType `json:"type" description:"node address type, one of Hostname, ExternalIP or InternalIP"`
Address string `json:"address" description:"the node address"`
}
// Node is a worker node in Kubernetes, formerly known as minion.
// Each node will have a unique identifier in the cache (i.e. in etcd).
type Node struct {
ObjectMeta `json:"metadata,omitempty" description:"standard object metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
// Status describes the current status of a Node
Status NodeStatus `json:"status,omitempty" description:"most recently observed status of the node; populated by the system, read-only; http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status"`
}
// NodeList is the whole list of all Nodes which have been registered with master.
type NodeList struct {
ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Node `json:"items" description:"list of nodes"`
}

View File

@ -0,0 +1,105 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"time"
"github.com/prometheus/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
// MarathonDiscovery provides service discovery based on a Marathon instance.
type MarathonDiscovery struct {
servers []string
refreshInterval time.Duration
done chan struct{}
lastRefresh map[string]*config.TargetGroup
client marathon.AppListClient
}
// NewMarathonDiscovery creates a new Marathon based discovery.
func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery {
return &MarathonDiscovery{
servers: conf.Servers,
refreshInterval: time.Duration(conf.RefreshInterval),
done: make(chan struct{}),
client: marathon.FetchMarathonApps,
}
}
// Sources implements the TargetProvider interface.
func (md *MarathonDiscovery) Sources() []string {
var sources []string
tgroups, err := md.fetchTargetGroups()
if err == nil {
for source := range tgroups {
sources = append(sources, source)
}
}
return sources
}
// Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
for {
select {
case <-done:
return
case <-time.After(md.refreshInterval):
err := md.updateServices(ch)
if err != nil {
log.Errorf("Error while updating services: %s", err)
}
}
}
}
func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups()
if err != nil {
return err
}
// Update services which are still present
for _, tg := range targetMap {
ch <- tg
}
// Remove services which did disappear
for source := range md.lastRefresh {
_, ok := targetMap[source]
if !ok {
log.Debugf("Removing group for %s", source)
ch <- &config.TargetGroup{Source: source}
}
}
md.lastRefresh = targetMap
return nil
}
func (md *MarathonDiscovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
url := marathon.RandomAppsURL(md.servers)
apps, err := md.client(url)
if err != nil {
return nil, err
}
groups := marathon.AppsToTargetGroups(apps)
return groups, nil
}

View File

@ -0,0 +1,47 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
import (
"encoding/json"
"io/ioutil"
"net/http"
)
// AppListClient defines a function that can be used to get an application list from marathon.
type AppListClient func(url string) (*AppList, error)
// FetchMarathonApps requests a list of applications from a marathon server.
func FetchMarathonApps(url string) (*AppList, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return parseAppJSON(body)
}
func parseAppJSON(body []byte) (*AppList, error) {
apps := &AppList{}
err := json.Unmarshal(body, apps)
if err != nil {
return nil, err
}
return apps, nil
}

View File

@ -0,0 +1,32 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
import (
clientmodel "github.com/prometheus/client_golang/model"
)
const (
// metaLabelPrefix is the meta prefix used for all meta labels in this discovery.
metaLabelPrefix = clientmodel.MetaLabelPrefix + "marathon_"
// appLabelPrefix is the prefix for the application labels.
appLabelPrefix = metaLabelPrefix + "app_label_"
// appLabel is used for the name of the app in Marathon.
appLabel clientmodel.LabelName = metaLabelPrefix + "app"
// imageLabel is the label that is used for the docker image running the service.
imageLabel clientmodel.LabelName = metaLabelPrefix + "image"
// taskLabel contains the mesos task name of the app instance.
taskLabel clientmodel.LabelName = metaLabelPrefix + "task"
)

View File

@ -0,0 +1,71 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
import (
"fmt"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
)
// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups.
func AppsToTargetGroups(apps *AppList) map[string]*config.TargetGroup {
tgroups := map[string]*config.TargetGroup{}
for _, a := range apps.Apps {
group := createTargetGroup(&a)
tgroups[group.Source] = group
}
return tgroups
}
func createTargetGroup(app *App) *config.TargetGroup {
var (
targets = targetsForApp(app)
appName = clientmodel.LabelValue(app.ID)
image = clientmodel.LabelValue(app.Container.Docker.Image)
)
tg := &config.TargetGroup{
Targets: targets,
Labels: clientmodel.LabelSet{
appLabel: appName,
imageLabel: image,
},
Source: app.ID,
}
for ln, lv := range app.Labels {
ln = appLabelPrefix + ln
tg.Labels[clientmodel.LabelName(ln)] = clientmodel.LabelValue(lv)
}
return tg
}
func targetsForApp(app *App) []clientmodel.LabelSet {
targets := make([]clientmodel.LabelSet, 0, len(app.Tasks))
for _, t := range app.Tasks {
target := targetForTask(&t)
targets = append(targets, clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(target),
taskLabel: clientmodel.LabelValue(t.ID),
})
}
return targets
}
func targetForTask(task *Task) string {
return fmt.Sprintf("%s:%d", task.Host, task.Ports[0])
}

View File

@ -0,0 +1,45 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
// Task describes one instance of a service running on Marathon.
type Task struct {
ID string `json:"id"`
Host string `json:"host"`
Ports []uint32 `json:"ports"`
}
// DockerContainer describes a container which uses the docker runtime.
type DockerContainer struct {
Image string `json:"image"`
}
// Container describes the runtime an app in running in.
type Container struct {
Docker DockerContainer `json:"docker"`
}
// App describes a service running on Marathon.
type App struct {
ID string `json:"id"`
Tasks []Task `json:"tasks"`
RunningTasks int `json:"tasksRunning"`
Labels map[string]string `json:"labels"`
Container Container `json:"container"`
}
// AppList is a list of Marathon apps.
type AppList struct {
Apps []App `json:"apps"`
}

View File

@ -0,0 +1,29 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package marathon
import (
"fmt"
"math/rand"
)
const appListPath string = "/v2/apps/?embed=apps.tasks"
// RandomAppsURL randomly selects a server from an array and creates
// an URL pointing to the app list.
func RandomAppsURL(servers []string) string {
// TODO: If possible update server list from Marathon at some point.
server := servers[rand.Intn(len(servers))]
return fmt.Sprintf("%s%s", server, appListPath)
}

View File

@ -0,0 +1,180 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"errors"
"testing"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
var marathonValidLabel = map[string]string{"prometheus": "yes"}
func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) {
ch := make(chan *config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{
Servers: []string{"http://localhost:8080"},
})
md.client = client
return ch, md
}
func TestMarathonSDHandleError(t *testing.T) {
var errTesting = errors.New("testing failure")
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return nil, errTesting
})
go func() {
select {
case tg := <-ch:
t.Fatalf("Got group: %s", tg)
default:
}
}()
err := md.updateServices(ch)
if err != errTesting {
t.Fatalf("Expected error: %s", err)
}
}
func TestMarathonSDEmptyList(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return &marathon.AppList{}, nil
})
go func() {
select {
case tg := <-ch:
t.Fatalf("Got group: %v", tg)
default:
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error: %s", err)
}
}
func marathonTestAppList(labels map[string]string, runningTasks int) *marathon.AppList {
task := marathon.Task{
ID: "test-task-1",
Host: "mesos-slave1",
Ports: []uint32{31000},
}
docker := marathon.DockerContainer{Image: "repo/image:tag"}
container := marathon.Container{Docker: docker}
app := marathon.App{
ID: "test-service",
Tasks: []marathon.Task{task},
RunningTasks: runningTasks,
Labels: labels,
Container: container,
}
return &marathon.AppList{
Apps: []marathon.App{app},
}
}
func TestMarathonSDSendGroup(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
go func() {
select {
case tg := <-ch:
if tg.Source != "test-service" {
t.Fatalf("Wrong target group name: %s", tg.Source)
}
if len(tg.Targets) != 1 {
t.Fatalf("Wrong number of targets: %v", tg.Targets)
}
tgt := tg.Targets[0]
if tgt[clientmodel.AddressLabel] != "mesos-slave1:31000" {
t.Fatalf("Wrong target address: %s", tgt[clientmodel.AddressLabel])
}
default:
t.Fatal("Did not get a target group.")
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error: %s", err)
}
}
func TestMarathonSDRemoveApp(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
go func() {
up1 := <-ch
up2 := <-ch
if up2.Source != up1.Source {
t.Fatalf("Source is different: %s", up2)
if len(up2.Targets) > 0 {
t.Fatalf("Got a non-empty target set: %s", up2.Targets)
}
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error on first update: %s", err)
}
md.client = func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil
}
err = md.updateServices(ch)
if err != nil {
t.Fatalf("Got error on second update: %s", err)
}
}
func TestMarathonSDSources(t *testing.T) {
_, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
sources := md.Sources()
if len(sources) != 1 {
t.Fatalf("Wrong number of sources: %s", sources)
}
}
func TestMarathonSDRunAndStop(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
md.refreshInterval = time.Millisecond * 10
done := make(chan struct{})
go func() {
select {
case <-ch:
close(done)
case <-time.After(md.refreshInterval * 3):
close(done)
t.Fatalf("Update took too long.")
}
}()
md.Run(ch, done)
select {
case <-ch:
default:
t.Fatalf("Channel not closed.")
}
}

View File

@ -0,0 +1,362 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/log"
"github.com/samuel/go-zookeeper/zk"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil"
)
const (
serversetNodePrefix = "member_"
serversetLabelPrefix = clientmodel.MetaLabelPrefix + "serverset_"
serversetStatusLabel = serversetLabelPrefix + "status"
serversetPathLabel = serversetLabelPrefix + "path"
serversetEndpointLabelPrefix = serversetLabelPrefix + "endpoint"
)
type serversetMember struct {
ServiceEndpoint serversetEndpoint
AdditionalEndpoints map[string]serversetEndpoint
Status string `json:"status"`
}
type serversetEndpoint struct {
Host string
Port int
}
type ZookeeperLogger struct {
}
// Implements zk.Logger
func (zl ZookeeperLogger) Printf(s string, i ...interface{}) {
log.Infof(s, i...)
}
// ServersetDiscovery retrieves target information from a Serverset server
// and updates them via watches.
type ServersetDiscovery struct {
conf *config.ServersetSDConfig
conn *zk.Conn
mu sync.RWMutex
sources map[string]*config.TargetGroup
sdUpdates *chan<- *config.TargetGroup
updates chan zookeeperTreeCacheEvent
treeCache *zookeeperTreeCache
}
// NewServersetDiscovery returns a new ServersetDiscovery for the given config.
func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery {
conn, _, err := zk.Connect(conf.Servers, time.Duration(conf.Timeout))
conn.SetLogger(ZookeeperLogger{})
if err != nil {
return nil
}
updates := make(chan zookeeperTreeCacheEvent)
sd := &ServersetDiscovery{
conf: conf,
conn: conn,
updates: updates,
sources: map[string]*config.TargetGroup{},
}
go sd.processUpdates()
sd.treeCache = NewZookeeperTreeCache(conn, conf.Paths[0], updates)
return sd
}
// Sources implements the TargetProvider interface.
func (sd *ServersetDiscovery) Sources() []string {
sd.mu.RLock()
defer sd.mu.RUnlock()
srcs := []string{}
for t := range sd.sources {
srcs = append(srcs, t)
}
return srcs
}
func (sd *ServersetDiscovery) processUpdates() {
defer sd.conn.Close()
for event := range sd.updates {
tg := &config.TargetGroup{
Source: event.Path,
}
sd.mu.Lock()
if event.Data != nil {
labelSet, err := parseServersetMember(*event.Data, event.Path)
if err == nil {
tg.Targets = []clientmodel.LabelSet{*labelSet}
sd.sources[event.Path] = tg
} else {
delete(sd.sources, event.Path)
}
} else {
delete(sd.sources, event.Path)
}
sd.mu.Unlock()
if sd.sdUpdates != nil {
*sd.sdUpdates <- tg
}
}
if sd.sdUpdates != nil {
close(*sd.sdUpdates)
}
}
// Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
// Send on everything we have seen so far.
sd.mu.Lock()
for _, targetGroup := range sd.sources {
ch <- targetGroup
}
// Tell processUpdates to send future updates.
sd.sdUpdates = &ch
sd.mu.Unlock()
<-done
sd.treeCache.Stop()
}
func parseServersetMember(data []byte, path string) (*clientmodel.LabelSet, error) {
member := serversetMember{}
err := json.Unmarshal(data, &member)
if err != nil {
return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err)
}
labels := clientmodel.LabelSet{}
labels[serversetPathLabel] = clientmodel.LabelValue(path)
labels[clientmodel.AddressLabel] = clientmodel.LabelValue(
fmt.Sprintf("%s:%d", member.ServiceEndpoint.Host, member.ServiceEndpoint.Port))
labels[serversetEndpointLabelPrefix+"_host"] = clientmodel.LabelValue(member.ServiceEndpoint.Host)
labels[serversetEndpointLabelPrefix+"_port"] = clientmodel.LabelValue(fmt.Sprintf("%d", member.ServiceEndpoint.Port))
for name, endpoint := range member.AdditionalEndpoints {
cleanName := clientmodel.LabelName(strutil.SanitizeLabelName(name))
labels[serversetEndpointLabelPrefix+"_host_"+cleanName] = clientmodel.LabelValue(
endpoint.Host)
labels[serversetEndpointLabelPrefix+"_port_"+cleanName] = clientmodel.LabelValue(
fmt.Sprintf("%d", endpoint.Port))
}
labels[serversetStatusLabel] = clientmodel.LabelValue(member.Status)
return &labels, nil
}
type zookeeperTreeCache struct {
conn *zk.Conn
prefix string
events chan zookeeperTreeCacheEvent
zkEvents chan zk.Event
stop chan struct{}
head *zookeeperTreeCacheNode
}
type zookeeperTreeCacheEvent struct {
Path string
Data *[]byte
}
type zookeeperTreeCacheNode struct {
data *[]byte
events chan zk.Event
done chan struct{}
stopped bool
children map[string]*zookeeperTreeCacheNode
}
func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan zookeeperTreeCacheEvent) *zookeeperTreeCache {
tc := &zookeeperTreeCache{
conn: conn,
prefix: path,
events: events,
stop: make(chan struct{}),
}
tc.head = &zookeeperTreeCacheNode{
events: make(chan zk.Event),
children: map[string]*zookeeperTreeCacheNode{},
stopped: true,
}
err := tc.recursiveNodeUpdate(path, tc.head)
if err != nil {
log.Errorf("Error during initial read of Zookeeper: %s", err)
}
go tc.loop(err != nil)
return tc
}
func (tc *zookeeperTreeCache) Stop() {
tc.stop <- struct{}{}
}
func (tc *zookeeperTreeCache) loop(failureMode bool) {
retryChan := make(chan struct{})
failure := func() {
failureMode = true
time.AfterFunc(time.Second*10, func() {
retryChan <- struct{}{}
})
}
if failureMode {
failure()
}
for {
select {
case ev := <-tc.head.events:
log.Debugf("Received Zookeeper event: %s", ev)
if failureMode {
continue
}
if ev.Type == zk.EventNotWatching {
log.Infof("Lost connection to Zookeeper.")
failure()
} else {
path := strings.TrimPrefix(ev.Path, tc.prefix)
parts := strings.Split(path, "/")
node := tc.head
for _, part := range parts[1:] {
childNode := node.children[part]
if childNode == nil {
childNode = &zookeeperTreeCacheNode{
events: tc.head.events,
children: map[string]*zookeeperTreeCacheNode{},
done: make(chan struct{}, 1),
}
node.children[part] = childNode
}
node = childNode
}
err := tc.recursiveNodeUpdate(ev.Path, node)
if err != nil {
log.Errorf("Error during processing of Zookeeper event: %s", err)
failure()
} else if tc.head.data == nil {
log.Errorf("Error during processing of Zookeeper event: path %s no longer exists", tc.prefix)
failure()
}
}
case <-retryChan:
log.Infof("Attempting to resync state with Zookeeper")
err := tc.recursiveNodeUpdate(tc.prefix, tc.head)
if err != nil {
log.Errorf("Error during Zookeeper resync: %s", err)
failure()
} else {
log.Infof("Zookeeper resync successful")
failureMode = false
}
case <-tc.stop:
close(tc.events)
return
}
}
}
func (tc *zookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTreeCacheNode) error {
data, _, dataWatcher, err := tc.conn.GetW(path)
if err == zk.ErrNoNode {
tc.recursiveDelete(path, node)
if node == tc.head {
return fmt.Errorf("path %s does not exist", path)
}
return nil
} else if err != nil {
return err
}
if node.data == nil || !bytes.Equal(*node.data, data) {
node.data = &data
tc.events <- zookeeperTreeCacheEvent{Path: path, Data: node.data}
}
children, _, childWatcher, err := tc.conn.ChildrenW(path)
if err == zk.ErrNoNode {
tc.recursiveDelete(path, node)
return nil
} else if err != nil {
return err
}
currentChildren := map[string]struct{}{}
for _, child := range children {
currentChildren[child] = struct{}{}
childNode := node.children[child]
// Does not already exists, create it.
if childNode == nil {
node.children[child] = &zookeeperTreeCacheNode{
events: node.events,
children: map[string]*zookeeperTreeCacheNode{},
done: make(chan struct{}, 1),
}
}
err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child])
if err != nil {
return err
}
}
// Remove nodes that no longer exist
for name, childNode := range node.children {
if _, ok := currentChildren[name]; !ok || node.data == nil {
tc.recursiveDelete(path+"/"+name, childNode)
delete(node.children, name)
}
}
go func() {
// Pass up zookeeper events, until the node is deleted.
select {
case event := <-dataWatcher:
node.events <- event
case event := <-childWatcher:
node.events <- event
case <-node.done:
}
}()
return nil
}
func (tc *zookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) {
if !node.stopped {
node.done <- struct{}{}
node.stopped = true
}
if node.data != nil {
tc.events <- zookeeperTreeCacheEvent{Path: path, Data: nil}
node.data = nil
}
for name, childNode := range node.children {
tc.recursiveDelete(path+"/"+name, childNode)
}
}