// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jaeger

import (
	"sync"
	"sync/atomic"
	"time"

	"github.com/opentracing/opentracing-go"
	"github.com/opentracing/opentracing-go/ext"
	"github.com/opentracing/opentracing-go/log"
)

// Span implements opentracing.Span
type Span struct {
	// referenceCounter used to increase the lifetime of
	// the object before return it into the pool.
	referenceCounter int32

	sync.RWMutex

	tracer *Tracer

	// TODO: (breaking change) change to use a pointer
	context SpanContext

	// The name of the "operation" this span is an instance of.
	// Known as a "span name" in some implementations.
	operationName string

	// firstInProcess, if true, indicates that this span is the root of the (sub)tree
	// of spans in the current process. In other words it's true for the root spans,
	// and the ingress spans when the process joins another trace.
	firstInProcess bool

	// startTime is the timestamp indicating when the span began, with microseconds precision.
	startTime time.Time

	// duration returns duration of the span with microseconds precision.
	// Zero value means duration is unknown.
	duration time.Duration

	// tags attached to this span
	tags []Tag

	// The span's "micro-log"
	logs []opentracing.LogRecord

	// The number of logs dropped because of MaxLogsPerSpan.
	numDroppedLogs int

	// references for this span
	references []Reference

	observer ContribSpanObserver
}

// Tag is a simple key value wrapper.
// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
type Tag struct {
	key   string
	value interface{}
}

// NewTag creates a new Tag.
// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
func NewTag(key string, value interface{}) Tag {
	return Tag{key: key, value: value}
}

// SetOperationName sets or changes the operation name.
func (s *Span) SetOperationName(operationName string) opentracing.Span {
	s.Lock()
	s.operationName = operationName
	s.Unlock()
	if !s.isSamplingFinalized() {
		decision := s.tracer.sampler.OnSetOperationName(s, operationName)
		s.applySamplingDecision(decision, true)
	}
	s.observer.OnSetOperationName(operationName)
	return s
}

// SetTag implements SetTag() of opentracing.Span
func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
	return s.setTagInternal(key, value, true)
}

func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
	s.observer.OnSetTag(key, value)
	if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
		return s
	}
	if !s.isSamplingFinalized() {
		decision := s.tracer.sampler.OnSetTag(s, key, value)
		s.applySamplingDecision(decision, lock)
	}
	if s.isWriteable() {
		if lock {
			s.Lock()
			defer s.Unlock()
		}
		s.appendTagNoLocking(key, value)
	}
	return s
}

// SpanContext returns span context
func (s *Span) SpanContext() SpanContext {
	s.Lock()
	defer s.Unlock()
	return s.context
}

// StartTime returns span start time
func (s *Span) StartTime() time.Time {
	s.Lock()
	defer s.Unlock()
	return s.startTime
}

// Duration returns span duration
func (s *Span) Duration() time.Duration {
	s.Lock()
	defer s.Unlock()
	return s.duration
}

// Tags returns tags for span
func (s *Span) Tags() opentracing.Tags {
	s.Lock()
	defer s.Unlock()
	var result = make(opentracing.Tags, len(s.tags))
	for _, tag := range s.tags {
		result[tag.key] = tag.value
	}
	return result
}

// Logs returns micro logs for span
func (s *Span) Logs() []opentracing.LogRecord {
	s.Lock()
	defer s.Unlock()

	logs := append([]opentracing.LogRecord(nil), s.logs...)
	if s.numDroppedLogs != 0 {
		fixLogs(logs, s.numDroppedLogs)
	}

	return logs
}

// References returns references for this span
func (s *Span) References() []opentracing.SpanReference {
	s.Lock()
	defer s.Unlock()

	if s.references == nil || len(s.references) == 0 {
		return nil
	}

	result := make([]opentracing.SpanReference, len(s.references))
	for i, r := range s.references {
		result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
	}
	return result
}

func (s *Span) appendTagNoLocking(key string, value interface{}) {
	s.tags = append(s.tags, Tag{key: key, value: value})
}

// LogFields implements opentracing.Span API
func (s *Span) LogFields(fields ...log.Field) {
	s.Lock()
	defer s.Unlock()
	if !s.context.IsSampled() {
		return
	}
	s.logFieldsNoLocking(fields...)
}

// this function should only be called while holding a Write lock
func (s *Span) logFieldsNoLocking(fields ...log.Field) {
	lr := opentracing.LogRecord{
		Fields:    fields,
		Timestamp: time.Now(),
	}
	s.appendLogNoLocking(lr)
}

// LogKV implements opentracing.Span API
func (s *Span) LogKV(alternatingKeyValues ...interface{}) {
	s.RLock()
	sampled := s.context.IsSampled()
	s.RUnlock()
	if !sampled {
		return
	}
	fields, err := log.InterleavedKVToFields(alternatingKeyValues...)
	if err != nil {
		s.LogFields(log.Error(err), log.String("function", "LogKV"))
		return
	}
	s.LogFields(fields...)
}

// LogEvent implements opentracing.Span API
func (s *Span) LogEvent(event string) {
	s.Log(opentracing.LogData{Event: event})
}

// LogEventWithPayload implements opentracing.Span API
func (s *Span) LogEventWithPayload(event string, payload interface{}) {
	s.Log(opentracing.LogData{Event: event, Payload: payload})
}

// Log implements opentracing.Span API
func (s *Span) Log(ld opentracing.LogData) {
	s.Lock()
	defer s.Unlock()
	if s.context.IsSampled() {
		if ld.Timestamp.IsZero() {
			ld.Timestamp = s.tracer.timeNow()
		}
		s.appendLogNoLocking(ld.ToLogRecord())
	}
}

// this function should only be called while holding a Write lock
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
	maxLogs := s.tracer.options.maxLogsPerSpan
	if maxLogs == 0 || len(s.logs) < maxLogs {
		s.logs = append(s.logs, lr)
		return
	}

	// We have too many logs. We don't touch the first numOld logs; we treat the
	// rest as a circular buffer and overwrite the oldest log among those.
	numOld := (maxLogs - 1) / 2
	numNew := maxLogs - numOld
	s.logs[numOld+s.numDroppedLogs%numNew] = lr
	s.numDroppedLogs++
}

// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
// the end (i.e. pos circular left shifts).
func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
	// This algorithm is described in:
	//    http://www.cplusplus.com/reference/algorithm/rotate
	for first, middle, next := 0, pos, pos; first != middle; {
		buf[first], buf[next] = buf[next], buf[first]
		first++
		next++
		if next == len(buf) {
			next = middle
		} else if first == middle {
			middle = next
		}
	}
}

func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
	// We dropped some log events, which means that we used part of Logs as a
	// circular buffer (see appendLog). De-circularize it.
	numOld := (len(logs) - 1) / 2
	numNew := len(logs) - numOld
	rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)

	// Replace the log in the middle (the oldest "new" log) with information
	// about the dropped logs. This means that we are effectively dropping one
	// more "new" log.
	numDropped := numDroppedLogs + 1
	logs[numOld] = opentracing.LogRecord{
		// Keep the timestamp of the last dropped event.
		Timestamp: logs[numOld].Timestamp,
		Fields: []log.Field{
			log.String("event", "dropped Span logs"),
			log.Int("dropped_log_count", numDropped),
			log.String("component", "jaeger-client"),
		},
	}
}

func (s *Span) fixLogsIfDropped() {
	if s.numDroppedLogs == 0 {
		return
	}
	fixLogs(s.logs, s.numDroppedLogs)
	s.numDroppedLogs = 0
}

// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext
func (s *Span) SetBaggageItem(key, value string) opentracing.Span {
	s.Lock()
	defer s.Unlock()
	s.tracer.setBaggage(s, key, value)
	return s
}

// BaggageItem implements BaggageItem() of opentracing.SpanContext
func (s *Span) BaggageItem(key string) string {
	s.RLock()
	defer s.RUnlock()
	return s.context.baggage[key]
}

// Finish implements opentracing.Span API
// After finishing the Span object it returns back to the allocator unless the reporter retains it again,
// so after that, the Span object should no longer be used because it won't be valid anymore.
func (s *Span) Finish() {
	s.FinishWithOptions(opentracing.FinishOptions{})
}

// FinishWithOptions implements opentracing.Span API
func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
	if options.FinishTime.IsZero() {
		options.FinishTime = s.tracer.timeNow()
	}
	s.observer.OnFinish(options)
	s.Lock()
	s.duration = options.FinishTime.Sub(s.startTime)
	s.Unlock()
	if !s.isSamplingFinalized() {
		decision := s.tracer.sampler.OnFinishSpan(s)
		s.applySamplingDecision(decision, true)
	}
	if s.context.IsSampled() {
		s.Lock()
		s.fixLogsIfDropped()
		if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
			// Note: bulk logs are not subject to maxLogsPerSpan limit
			if options.LogRecords != nil {
				s.logs = append(s.logs, options.LogRecords...)
			}
			for _, ld := range options.BulkLogData {
				s.logs = append(s.logs, ld.ToLogRecord())
			}
		}
		s.Unlock()
	}
	// call reportSpan even for non-sampled traces, to return span to the pool
	// and update metrics counter
	s.tracer.reportSpan(s)
}

// Context implements opentracing.Span API
func (s *Span) Context() opentracing.SpanContext {
	s.Lock()
	defer s.Unlock()
	return s.context
}

// Tracer implements opentracing.Span API
func (s *Span) Tracer() opentracing.Tracer {
	return s.tracer
}

func (s *Span) String() string {
	s.RLock()
	defer s.RUnlock()
	return s.context.String()
}

// OperationName allows retrieving current operation name.
func (s *Span) OperationName() string {
	s.RLock()
	defer s.RUnlock()
	return s.operationName
}

// Retain increases object counter to increase the lifetime of the object
func (s *Span) Retain() *Span {
	atomic.AddInt32(&s.referenceCounter, 1)
	return s
}

// Release decrements object counter and return to the
// allocator manager  when counter will below zero
func (s *Span) Release() {
	if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
		s.tracer.spanAllocator.Put(s)
	}
}

// reset span state and release unused data
func (s *Span) reset() {
	s.firstInProcess = false
	s.context = emptyContext
	s.operationName = ""
	s.tracer = nil
	s.startTime = time.Time{}
	s.duration = 0
	s.observer = nil
	atomic.StoreInt32(&s.referenceCounter, 0)

	// Note: To reuse memory we can save the pointers on the heap
	s.tags = s.tags[:0]
	s.logs = s.logs[:0]
	s.numDroppedLogs = 0
	s.references = s.references[:0]
}

func (s *Span) serviceName() string {
	return s.tracer.serviceName
}

func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
	if !decision.Retryable {
		s.context.samplingState.setFinal()
	}
	if decision.Sample {
		s.context.samplingState.setSampled()
		if len(decision.Tags) > 0 {
			if lock {
				s.Lock()
				defer s.Unlock()
			}
			for _, tag := range decision.Tags {
				s.appendTagNoLocking(tag.key, tag.value)
			}
		}
	}
}

// Span can be written to if it is sampled or the sampling decision has not been finalized.
func (s *Span) isWriteable() bool {
	state := s.context.samplingState
	return !state.isFinal() || state.isSampled()
}

func (s *Span) isSamplingFinalized() bool {
	return s.context.samplingState.isFinal()
}

// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
// The behavior of setSamplingPriority is surprising
// If noDebugFlagOnForcedSampling is set
//     setSamplingPriority(span, 1) always sets only flagSampled
// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
//     setSamplingPriority(span, 1) sets both flagSampled and flagDebug
// However,
//     setSamplingPriority(span, 0) always only resets flagSampled
//
// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can
// leave flagDebug set
func setSamplingPriority(s *Span, value interface{}) bool {
	val, ok := value.(uint16)
	if !ok {
		return false
	}
	if val == 0 {
		s.context.samplingState.unsetSampled()
		s.context.samplingState.setFinal()
		return true
	}
	if s.tracer.options.noDebugFlagOnForcedSampling {
		s.context.samplingState.setSampled()
		s.context.samplingState.setFinal()
		return true
	} else if s.tracer.isDebugAllowed(s.operationName) {
		s.context.samplingState.setDebugAndSampled()
		s.context.samplingState.setFinal()
		return true
	}
	return false
}

// EnableFirehose enables firehose flag on the span context
func EnableFirehose(s *Span) {
	s.Lock()
	defer s.Unlock()
	s.context.samplingState.setFirehose()
}