1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift"
"github.com/uber/jaeger-client-go/thrift-gen/agent"
"github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
)
// UDPPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent
const UDPPacketMaxLength = 65000
// AgentClientUDP is a UDP client to Jaeger agent that implements agent.Agent interface.
type AgentClientUDP struct {
agent.Agent
io.Closer
connUDP udpConn
client *agent.AgentClient
maxPacketSize int // max size of datagram in bytes
thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
}
type udpConn interface {
Write([]byte) (int, error)
SetWriteBuffer(int) error
Close() error
}
// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should
// be passed to NewAgentClientUDPWithParams.
type AgentClientUDPParams struct {
HostPort string
MaxPacketSize int
Logger log.Logger
DisableAttemptReconnecting bool
AttemptReconnectInterval time.Duration
}
// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP.
func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) {
// validate hostport
if _, _, err := net.SplitHostPort(params.HostPort); err != nil {
return nil, err
}
if params.MaxPacketSize == 0 {
params.MaxPacketSize = UDPPacketMaxLength
}
if params.Logger == nil {
params.Logger = log.StdLogger
}
if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 {
params.AttemptReconnectInterval = time.Second * 30
}
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
protocolFactory := thrift.NewTCompactProtocolFactory()
client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory)
var connUDP udpConn
var err error
if params.DisableAttemptReconnecting {
destAddr, err := net.ResolveUDPAddr("udp", params.HostPort)
if err != nil {
return nil, err
}
connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
if err != nil {
return nil, err
}
} else {
// host is hostname, setup resolver loop in case host record changes during operation
connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
if err != nil {
return nil, err
}
}
if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
return nil, err
}
return &AgentClientUDP{
connUDP: connUDP,
client: client,
maxPacketSize: params.MaxPacketSize,
thriftBuffer: thriftBuffer,
}, nil
}
// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) {
return NewAgentClientUDPWithParams(AgentClientUDPParams{
HostPort: hostPort,
MaxPacketSize: maxPacketSize,
})
}
// EmitZipkinBatch implements EmitZipkinBatch() of Agent interface
func (a *AgentClientUDP) EmitZipkinBatch(context.Context, []*zipkincore.Span) error {
return errors.New("Not implemented")
}
// EmitBatch implements EmitBatch() of Agent interface
func (a *AgentClientUDP) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
a.thriftBuffer.Reset()
if err := a.client.EmitBatch(ctx, batch); err != nil {
return err
}
if a.thriftBuffer.Len() > a.maxPacketSize {
return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
}
_, err := a.connUDP.Write(a.thriftBuffer.Bytes())
return err
}
// Close implements Close() of io.Closer and closes the underlying UDP connection.
func (a *AgentClientUDP) Close() error {
return a.connUDP.Close()
}
|