telegraf 收集docker容器指标 inputs.docker
目录[[outputs.influxdb]][[inputs.docker]]docker configmeasurementdockerdocker_datadocker_metadatadocker_container_healthdocker_container_cpudocker_container_memdocker_conta...
目录
missing docker_container_net和docker_container_blkio
telegraf版本:v1.6
[[outputs.influxdb]]
urls: 指定influxdb服务器地址,可以有多个
database: 指定需要在influxdb里面创建的数据库的名称,也就是说telegraf采集的数据存放在influxdb的哪个数据库里面,默认是telegraf。
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = ["unix:///var/run/influxdb.sock"]
# urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
## The target database for metrics; will be created as needed.
# database = "telegraf"
## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the
## database already exists.
# skip_database_creation = false
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy.
# retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
# write_consistency = "any"
## Timeout for HTTP messages.
# timeout = "5s"
## HTTP Basic Auth
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## HTTP User-Agent
# user_agent = "telegraf"
## UDP payload size is the maximum packet size to send.
# udp_payload = 512
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
# http_proxy = "http://corporate.proxy:3128"
## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
[[inputs.docker]]
读取docker容器的指标信息,如果你要收集docker容器里面的指标,就需要配置这个插件。
# # Read metrics about docker containers
# [[inputs.docker]]
# ## Docker Endpoint
# ## To use TCP, set endpoint = "tcp://[ip]:[port]"
# ## To use environment variables (ie, docker-machine), set endpoint = "ENV"
# endpoint = "unix:///var/run/docker.sock"
#
# ## Set to true to collect Swarm metrics(desired_replicas, running_replicas)
# gather_services = false
#
# ## Only collect metrics for these containers, collect all if empty
# container_names = []
#
# ## Containers to include and exclude. Globs accepted.
# ## Note that an empty array for both will include all containers
# container_name_include = []
# container_name_exclude = []
#
# ## Container states to include and exclude. Globs accepted.
# ## When empty only containers in the "running" state will be captured.
# # container_state_include = []
# # container_state_exclude = []
#
# ## Timeout for docker list, info, and stats commands
# timeout = "5s"
#
# ## Whether to report for each container per-device blkio (8:0, 8:1...) and
# ## network (eth0, eth1, ...) stats or not
# perdevice = true
# ## Whether to report for each container total blkio and network stats or not
# total = false
# ## Which environment variables should we use as a tag
# ##tag_env = ["JAVA_HOME", "HEAP_SIZE"]
#
# ## docker labels to include and exclude as tags. Globs accepted.
# ## Note that an empty array for both will include all labels as tags
# docker_label_include = []
# docker_label_exclude = []
#
# ## Optional SSL Config
# # ssl_ca = "/etc/telegraf/ca.pem"
# # ssl_cert = "/etc/telegraf/cert.pem"
# # ssl_key = "/etc/telegraf/key.pem"
# ## Use SSL but skip chain & host verification
# # insecure_skip_verify = false
代码位置:github.com/influxdata/telegraf/plugins/inputs/docker/docker.go
docker config
// Docker object
type Docker struct {
Endpoint string
ContainerNames []string // deprecated in 1.4; use container_name_include
GatherServices bool `toml:"gather_services"`
Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
Total bool `toml:"total"`
TagEnvironment []string `toml:"tag_env"`
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
ContainerInclude []string `toml:"container_name_include"`
ContainerExclude []string `toml:"container_name_exclude"`
ContainerStateInclude []string `toml:"container_state_include"`
ContainerStateExclude []string `toml:"container_state_exclude"`
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"`
InsecureSkipVerify bool
newEnvClient func() (Client, error)
newClient func(string, *tls.Config) (Client, error)
client Client
httpClient *http.Client
engine_host string
filtersCreated bool
labelFilter filter.Filter
containerFilter filter.Filter
stateFilter filter.Filter
}
measurement
docker
获取docker daemon信息,会在telegraf里面创建一个docker measurement。
func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
// Init vars
dataFields := make(map[string]interface{})
metadataFields := make(map[string]interface{})
now := time.Now()
// Get info from docker daemon
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
defer cancel()
info, err := d.client.Info(ctx)
if err != nil {
return err
}
d.engine_host = info.Name
fields := map[string]interface{}{
"n_cpus": info.NCPU,
"n_used_file_descriptors": info.NFd,
"n_containers": info.Containers,
"n_containers_running": info.ContainersRunning,
"n_containers_stopped": info.ContainersStopped,
"n_containers_paused": info.ContainersPaused,
"n_images": info.Images,
"n_goroutines": info.NGoroutines,
"n_listener_events": info.NEventsListener,
}
// Add metrics
acc.AddFields("docker",
fields,
map[string]string{"engine_host": d.engine_host},
now)
acc.AddFields("docker",
map[string]interface{}{"memory_total": info.MemTotal},
map[string]string{"unit": "bytes", "engine_host": d.engine_host},
now)
// Get storage metrics
for _, rawData := range info.DriverStatus {
// Try to convert string to int (bytes)
value, err := parseSize(rawData[1])
if err != nil {
continue
}
name := strings.ToLower(strings.Replace(rawData[0], " ", "_", -1))
if name == "pool_blocksize" {
// pool blocksize
acc.AddFields("docker",
map[string]interface{}{"pool_blocksize": value},
map[string]string{"unit": "bytes", "engine_host": d.engine_host},
now)
}
...
}
...
}
docker_data
func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
...
if len(dataFields) > 0 {
acc.AddFields("docker_data",
dataFields,
map[string]string{"unit": "bytes", "engine_host": d.engine_host},
now)
}
...
}
docker_metadata
func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
...
if len(metadataFields) > 0 {
acc.AddFields("docker_metadata",
metadataFields,
map[string]string{"unit": "bytes", "engine_host": d.engine_host},
now)
}
...
}
docker_container_health
func (d *Docker) gatherContainer(
container types.Container,
acc telegraf.Accumulator,
) error {
...
if info.State.Health != nil {
healthfields := map[string]interface{}{
"health_status": info.State.Health.Status,
"failing_streak": info.ContainerJSONBase.State.Health.FailingStreak,
}
acc.AddFields("docker_container_health", healthfields, tags, time.Now())
}
...
}
docker_container_cpu
func gatherContainerStats(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,
id string,
perDevice bool,
total bool,
daemonOSType string,
) {
...
cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
"usage_in_kernelmode": stat.CPUStats.CPUUsage.UsageInKernelmode,
"usage_system": stat.CPUStats.SystemUsage,
"throttling_periods": stat.CPUStats.ThrottlingData.Periods,
"throttling_throttled_periods": stat.CPUStats.ThrottlingData.ThrottledPeriods,
"throttling_throttled_time": stat.CPUStats.ThrottlingData.ThrottledTime,
"container_id": id,
}
if daemonOSType != "windows" {
previousCPU := stat.PreCPUStats.CPUUsage.TotalUsage
previousSystem := stat.PreCPUStats.SystemUsage
cpuPercent := calculateCPUPercentUnix(previousCPU, previousSystem, stat)
cpufields["usage_percent"] = cpuPercent
} else {
cpuPercent := calculateCPUPercentWindows(stat)
cpufields["usage_percent"] = cpuPercent
}
cputags := copyTags(tags)
cputags["cpu"] = "cpu-total"
acc.AddFields("docker_container_cpu", cpufields, cputags, tm)
// If we have OnlineCPUs field, then use it to restrict stats gathering to only Online CPUs
// (https://github.com/moby/moby/commit/115f91d7575d6de6c7781a96a082f144fd17e400)
var percpuusage []uint64
if stat.CPUStats.OnlineCPUs > 0 {
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage[:stat.CPUStats.OnlineCPUs]
} else {
percpuusage = stat.CPUStats.CPUUsage.PercpuUsage
}
for i, percpu := range percpuusage {
percputags := copyTags(tags)
percputags["cpu"] = fmt.Sprintf("cpu%d", i)
fields := map[string]interface{}{
"usage_total": percpu,
"container_id": id,
}
acc.AddFields("docker_container_cpu", fields, percputags, tm)
}
...
}
docker_container_mem
func gatherContainerStats(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,
id string,
perDevice bool,
total bool,
daemonOSType string,
) {
...
memfields := map[string]interface{}{
"container_id": id,
}
memstats := []string{
"active_anon",
"active_file",
"cache",
"hierarchical_memory_limit",
"inactive_anon",
"inactive_file",
"mapped_file",
"pgfault",
"pgmajfault",
"pgpgin",
"pgpgout",
"rss",
"rss_huge",
"total_active_anon",
"total_active_file",
"total_cache",
"total_inactive_anon",
"total_inactive_file",
"total_mapped_file",
"total_pgfault",
"total_pgmajfault",
"total_pgpgin",
"total_pgpgout",
"total_rss",
"total_rss_huge",
"total_unevictable",
"total_writeback",
"unevictable",
"writeback",
}
for _, field := range memstats {
if value, ok := stat.MemoryStats.Stats[field]; ok {
memfields[field] = value
}
}
if stat.MemoryStats.Failcnt != 0 {
memfields["fail_count"] = stat.MemoryStats.Failcnt
}
if daemonOSType != "windows" {
memfields["limit"] = stat.MemoryStats.Limit
memfields["usage"] = stat.MemoryStats.Usage
memfields["max_usage"] = stat.MemoryStats.MaxUsage
mem := calculateMemUsageUnixNoCache(stat.MemoryStats)
memLimit := float64(stat.MemoryStats.Limit)
memfields["usage_percent"] = calculateMemPercentUnixNoCache(memLimit, mem)
} else {
memfields["commit_bytes"] = stat.MemoryStats.Commit
memfields["commit_peak_bytes"] = stat.MemoryStats.CommitPeak
memfields["private_working_set"] = stat.MemoryStats.PrivateWorkingSet
}
acc.AddFields("docker_container_mem", memfields, tags, tm)
...
}
docker_container_net
func gatherContainerStats(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,
id string,
perDevice bool,
total bool,
daemonOSType string,
) {
...
totalNetworkStatMap := make(map[string]interface{})
for network, netstats := range stat.Networks {
netfields := map[string]interface{}{
"rx_dropped": netstats.RxDropped,
"rx_bytes": netstats.RxBytes,
"rx_errors": netstats.RxErrors,
"tx_packets": netstats.TxPackets,
"tx_dropped": netstats.TxDropped,
"rx_packets": netstats.RxPackets,
"tx_errors": netstats.TxErrors,
"tx_bytes": netstats.TxBytes,
"container_id": id,
}
// Create a new network tag dictionary for the "network" tag
if perDevice {
nettags := copyTags(tags)
nettags["network"] = network
acc.AddFields("docker_container_net", netfields, nettags, tm)
}
if total {
for field, value := range netfields {
if field == "container_id" {
continue
}
var uintV uint64
switch v := value.(type) {
case uint64:
uintV = v
case int64:
uintV = uint64(v)
default:
continue
}
_, ok := totalNetworkStatMap[field]
if ok {
totalNetworkStatMap[field] = totalNetworkStatMap[field].(uint64) + uintV
} else {
totalNetworkStatMap[field] = uintV
}
}
}
}
// totalNetworkStatMap could be empty if container is running with --net=host.
if total && len(totalNetworkStatMap) != 0 {
nettags := copyTags(tags)
nettags["network"] = "total"
totalNetworkStatMap["container_id"] = id
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, tm)
}
...
}
docker_container_blkio
func gatherBlockIOMetrics(
stat *types.StatsJSON,
acc telegraf.Accumulator,
tags map[string]string,
tm time.Time,
id string,
perDevice bool,
total bool,
) {
blkioStats := stat.BlkioStats
// Make a map of devices to their block io stats
deviceStatMap := make(map[string]map[string]interface{})
for _, metric := range blkioStats.IoServiceBytesRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device]
if !ok {
deviceStatMap[device] = make(map[string]interface{})
}
field := fmt.Sprintf("io_service_bytes_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IoServicedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
_, ok := deviceStatMap[device]
if !ok {
deviceStatMap[device] = make(map[string]interface{})
}
field := fmt.Sprintf("io_serviced_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IoQueuedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_queue_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IoServiceTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_service_time_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IoWaitTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_wait_time_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IoMergedRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
field := fmt.Sprintf("io_merged_recursive_%s", strings.ToLower(metric.Op))
deviceStatMap[device][field] = metric.Value
}
for _, metric := range blkioStats.IoTimeRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
deviceStatMap[device]["io_time_recursive"] = metric.Value
}
for _, metric := range blkioStats.SectorsRecursive {
device := fmt.Sprintf("%d:%d", metric.Major, metric.Minor)
deviceStatMap[device]["sectors_recursive"] = metric.Value
}
totalStatMap := make(map[string]interface{})
for device, fields := range deviceStatMap {
fields["container_id"] = id
if perDevice {
iotags := copyTags(tags)
iotags["device"] = device
acc.AddFields("docker_container_blkio", fields, iotags, tm)
}
if total {
for field, value := range fields {
if field == "container_id" {
continue
}
var uintV uint64
switch v := value.(type) {
case uint64:
uintV = v
case int64:
uintV = uint64(v)
default:
continue
}
_, ok := totalStatMap[field]
if ok {
totalStatMap[field] = totalStatMap[field].(uint64) + uintV
} else {
totalStatMap[field] = uintV
}
}
}
}
if total {
totalStatMap["container_id"] = id
iotags := copyTags(tags)
iotags["device"] = "total"
acc.AddFields("docker_container_blkio", totalStatMap, iotags, tm)
}
}
docker_swarm
只有在[[inputs.docker]]里面指定了,才会有docker_swarm
gather_services=true
func (d *Docker) gatherSwarmInfo(acc telegraf.Accumulator) error {
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
defer cancel()
services, err := d.client.ServiceList(ctx, types.ServiceListOptions{})
if err != nil {
return err
}
if len(services) > 0 {
tasks, err := d.client.TaskList(ctx, types.TaskListOptions{})
if err != nil {
return err
}
nodes, err := d.client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return err
}
running := map[string]int{}
tasksNoShutdown := map[string]int{}
activeNodes := make(map[string]struct{})
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = struct{}{}
}
}
for _, task := range tasks {
if task.DesiredState != swarm.TaskStateShutdown {
tasksNoShutdown[task.ServiceID]++
}
if task.Status.State == swarm.TaskStateRunning {
running[task.ServiceID]++
}
}
for _, service := range services {
tags := map[string]string{}
fields := make(map[string]interface{})
now := time.Now()
tags["service_id"] = service.ID
tags["service_name"] = service.Spec.Name
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
tags["service_mode"] = "replicated"
fields["tasks_running"] = running[service.ID]
fields["tasks_desired"] = *service.Spec.Mode.Replicated.Replicas
} else if service.Spec.Mode.Global != nil {
tags["service_mode"] = "global"
fields["tasks_running"] = running[service.ID]
fields["tasks_desired"] = tasksNoShutdown[service.ID]
} else {
log.Printf("E! Unknow Replicas Mode")
}
// Add metrics
acc.AddFields("docker_swarm",
fields,
tags,
now)
}
}
return nil
}
missing docker_container_net和docker_container_blkio
公司两台服务器A,B安装了influxdb用来接收telegraf采集的docker metric。相同的telegraf.conf配置信息.
A服务器收到了docker_container_cpu, docker_container_mem, docker_container_net,docker_container_blkio的metric
B服务器只收到了docker_container_cpu, docker_container_mem。没有docker_container_net,docker_container_blkio的metric。
A,B服务器influxdb收集telegraf采集的docker指标在我如之前就已经有的,入职后B服务器一直就没有docker_container_net,docker_container_blkio这两个measurement。
有天我专门花了一天时间排查原因,结合源码+telegraf.conf,还是找不出为什么,条件都是符合要求的。
结果第二天,B服务器竟然也收到了 docker_container_net,docker_container_blkio。
太奇怪了。
相关issue
telegraf cannot send docker‘s data to influxdb : https://github.com/influxdata/telegraf/issues/5443
docker_container_net is missing: https://github.com/influxdata/telegraf/issues/2069
更多推荐
所有评论(0)