// Copyright (c) 2017 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package remote import ( "fmt" "net/url" "sync" "time" "github.com/uber/jaeger-client-go/internal/baggage" thrift "github.com/uber/jaeger-client-go/thrift-gen/baggage" "github.com/uber/jaeger-client-go/utils" ) type httpBaggageRestrictionManagerProxy struct { url string } func newHTTPBaggageRestrictionManagerProxy(hostPort, serviceName string) *httpBaggageRestrictionManagerProxy { v := url.Values{} v.Set("service", serviceName) return &httpBaggageRestrictionManagerProxy{ url: fmt.Sprintf("http://%s/baggageRestrictions?%s", hostPort, v.Encode()), } } func (s *httpBaggageRestrictionManagerProxy) GetBaggageRestrictions(serviceName string) ([]*thrift.BaggageRestriction, error) { var out []*thrift.BaggageRestriction if err := utils.GetJSON(s.url, &out); err != nil { return nil, err } return out, nil } // RestrictionManager manages baggage restrictions by retrieving baggage restrictions from agent type RestrictionManager struct { options mux sync.RWMutex serviceName string restrictions map[string]*baggage.Restriction thriftProxy thrift.BaggageRestrictionManager pollStopped sync.WaitGroup stopPoll chan struct{} invalidRestriction *baggage.Restriction validRestriction *baggage.Restriction // Determines if the manager has successfully retrieved baggage restrictions from agent initialized bool } // NewRestrictionManager returns a BaggageRestrictionManager that polls the agent for the latest // baggage restrictions. func NewRestrictionManager(serviceName string, options ...Option) *RestrictionManager { // TODO there is a developing use case where a single tracer can generate traces on behalf of many services. // restrictionsMap will need to exist per service opts := applyOptions(options...) m := &RestrictionManager{ serviceName: serviceName, options: opts, restrictions: make(map[string]*baggage.Restriction), thriftProxy: newHTTPBaggageRestrictionManagerProxy(opts.hostPort, serviceName), stopPoll: make(chan struct{}), invalidRestriction: baggage.NewRestriction(false, 0), validRestriction: baggage.NewRestriction(true, defaultMaxValueLength), } m.pollStopped.Add(1) go m.pollManager() return m } // isReady returns true if the manager has retrieved baggage restrictions from the remote source. func (m *RestrictionManager) isReady() bool { m.mux.RLock() defer m.mux.RUnlock() return m.initialized } // GetRestriction implements RestrictionManager#GetRestriction. func (m *RestrictionManager) GetRestriction(service, key string) *baggage.Restriction { m.mux.RLock() defer m.mux.RUnlock() if !m.initialized { if m.denyBaggageOnInitializationFailure { return m.invalidRestriction } return m.validRestriction } if restriction, ok := m.restrictions[key]; ok { return restriction } return m.invalidRestriction } // Close stops remote polling and closes the RemoteRestrictionManager. func (m *RestrictionManager) Close() error { close(m.stopPoll) m.pollStopped.Wait() return nil } func (m *RestrictionManager) pollManager() { defer m.pollStopped.Done() // attempt to initialize baggage restrictions if err := m.updateRestrictions(); err != nil { m.logger.Error(fmt.Sprintf("Failed to initialize baggage restrictions: %s", err.Error())) } ticker := time.NewTicker(m.refreshInterval) defer ticker.Stop() for { select { case <-ticker.C: if err := m.updateRestrictions(); err != nil { m.logger.Error(fmt.Sprintf("Failed to update baggage restrictions: %s", err.Error())) } case <-m.stopPoll: return } } } func (m *RestrictionManager) updateRestrictions() error { restrictions, err := m.thriftProxy.GetBaggageRestrictions(m.serviceName) if err != nil { m.metrics.BaggageRestrictionsUpdateFailure.Inc(1) return err } newRestrictions := m.parseRestrictions(restrictions) m.metrics.BaggageRestrictionsUpdateSuccess.Inc(1) m.mux.Lock() defer m.mux.Unlock() m.initialized = true m.restrictions = newRestrictions return nil } func (m *RestrictionManager) parseRestrictions(restrictions []*thrift.BaggageRestriction) map[string]*baggage.Restriction { setters := make(map[string]*baggage.Restriction, len(restrictions)) for _, restriction := range restrictions { setters[restriction.BaggageKey] = baggage.NewRestriction(true, int(restriction.MaxValueLength)) } return setters }