aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/godbus/dbus/v5/sequential_handler.go
blob: ef2fcdba179cf83942d8bb235601935b249a649d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package dbus

import (
	"sync"
)

// NewSequentialSignalHandler returns an instance of a new
// signal handler that guarantees sequential processing of signals. It is a
// guarantee of this signal handler that signals will be written to
// channels in the order they are received on the DBus connection.
func NewSequentialSignalHandler() SignalHandler {
	return &sequentialSignalHandler{}
}

type sequentialSignalHandler struct {
	mu      sync.RWMutex
	closed  bool
	signals []*sequentialSignalChannelData
}

func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
	sh.mu.RLock()
	defer sh.mu.RUnlock()
	if sh.closed {
		return
	}
	for _, scd := range sh.signals {
		scd.deliver(signal)
	}
}

func (sh *sequentialSignalHandler) Terminate() {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.closed {
		return
	}

	for _, scd := range sh.signals {
		scd.close()
		close(scd.ch)
	}
	sh.closed = true
	sh.signals = nil
}

func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.closed {
		return
	}
	sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))
}

func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.closed {
		return
	}
	for i := len(sh.signals) - 1; i >= 0; i-- {
		if ch == sh.signals[i].ch {
			sh.signals[i].close()
			copy(sh.signals[i:], sh.signals[i+1:])
			sh.signals[len(sh.signals)-1] = nil
			sh.signals = sh.signals[:len(sh.signals)-1]
		}
	}
}

type sequentialSignalChannelData struct {
	ch   chan<- *Signal
	in   chan *Signal
	done chan struct{}
}

func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {
	scd := &sequentialSignalChannelData{
		ch:   ch,
		in:   make(chan *Signal),
		done: make(chan struct{}),
	}
	go scd.bufferSignals()
	return scd
}

func (scd *sequentialSignalChannelData) bufferSignals() {
	defer close(scd.done)

	// Ensure that signals are delivered to scd.ch in the same
	// order they are received from scd.in.
	var queue []*Signal
	for {
		if len(queue) == 0 {
			signal, ok := <- scd.in
			if !ok {
				return
			}
			queue = append(queue, signal)
		}
		select {
		case scd.ch <- queue[0]:
			copy(queue, queue[1:])
			queue[len(queue)-1] = nil
			queue = queue[:len(queue)-1]
		case signal, ok := <-scd.in:
			if !ok {
				return
			}
			queue = append(queue, signal)
		}
	}
}

func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
	scd.in <- signal
}

func (scd *sequentialSignalChannelData) close() {
	close(scd.in)
	// Ensure that bufferSignals() has exited and won't attempt
	// any future sends on scd.ch
	<-scd.done
}