diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/cgroups/blkio.go | 100 | ||||
-rw-r--r-- | pkg/cgroups/cgroups.go | 367 | ||||
-rw-r--r-- | pkg/cgroups/cgroups_supported.go | 27 | ||||
-rw-r--r-- | pkg/cgroups/cgroups_unsupported.go | 8 | ||||
-rw-r--r-- | pkg/cgroups/cpu.go | 97 | ||||
-rw-r--r-- | pkg/cgroups/cpuset.go | 80 | ||||
-rw-r--r-- | pkg/cgroups/memory.go | 60 | ||||
-rw-r--r-- | pkg/cgroups/pids.go | 62 | ||||
-rw-r--r-- | pkg/cgroups/systemd.go | 92 |
9 files changed, 893 insertions, 0 deletions
diff --git a/pkg/cgroups/blkio.go b/pkg/cgroups/blkio.go new file mode 100644 index 000000000..8434703fd --- /dev/null +++ b/pkg/cgroups/blkio.go @@ -0,0 +1,100 @@ +package cgroups + +import ( + "bufio" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + spec "github.com/opencontainers/runtime-spec/specs-go" +) + +type blkioHandler struct { +} + +func getBlkioHandler() *blkioHandler { + return &blkioHandler{} +} + +// Apply set the specified constraints +func (c *blkioHandler) Apply(ctr *CgroupControl, res *spec.LinuxResources) error { + if res.BlockIO == nil { + return nil + } + return fmt.Errorf("blkio apply function not implemented yet") +} + +// Create the cgroup +func (c *blkioHandler) Create(ctr *CgroupControl) (bool, error) { + if ctr.cgroup2 { + return false, fmt.Errorf("function not implemented yet") + } + return ctr.createCgroupDirectory(Blkio) +} + +// Destroy the cgroup +func (c *blkioHandler) Destroy(ctr *CgroupControl) error { + return os.Remove(ctr.getCgroupv1Path(Blkio)) +} + +// Stat fills a metrics structure with usage stats for the controller +func (c *blkioHandler) Stat(ctr *CgroupControl, m *Metrics) error { + var ioServiceBytesRecursive []BlkIOEntry + + if ctr.cgroup2 { + return fmt.Errorf("function not implemented yet") + } + + BlkioRoot := ctr.getCgroupv1Path(Blkio) + + p := filepath.Join(BlkioRoot, "blkio.throttle.io_service_bytes_recursive") + f, err := os.Open(p) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + + var ioServiceBytesRecursive []BlkIOEntry + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + parts := strings.Fields(line) + if len(parts) < 3 { + continue + } + d := strings.Split(parts[0], ":") + if len(d) != 2 { + continue + } + minor, err := strconv.ParseUint(d[0], 10, 0) + if err != nil { + return err + } + major, err := strconv.ParseUint(d[1], 10, 0) + if err != nil { + return err + } + + op := parts[1] + + value, err := strconv.ParseUint(parts[2], 10, 0) + if err != nil { + return err + } + entry := BlkIOEntry{ + Op: op, + Major: major, + Minor: minor, + Value: value, + } + ioServiceBytesRecursive = append(ioServiceBytesRecursive, entry) + } + m.Blkio = BlkioMetrics{IoServiceBytesRecursive: ioServiceBytesRecursive} + return nil +} diff --git a/pkg/cgroups/cgroups.go b/pkg/cgroups/cgroups.go new file mode 100644 index 000000000..2a88c9db6 --- /dev/null +++ b/pkg/cgroups/cgroups.go @@ -0,0 +1,367 @@ +package cgroups + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + spec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + // ErrCgroupDeleted means the cgroup was deleted + ErrCgroupDeleted = errors.New("cgroups: cgroup deleted") +) + +// CgroupControl controls a cgroup hierarchy +type CgroupControl struct { + cgroup2 bool + path string + systemd bool + // List of additional cgroup subsystems joined that + // do not have a custom handler. + additionalControllers []controller +} + +// CPUUsage keeps stats for the CPU usage +type CPUUsage struct { + Kernel uint64 + Total uint64 + PerCPU []uint64 +} + +// MemoryUsage keeps stats for the memory usage +type MemoryUsage struct { + Usage uint64 + Limit uint64 +} + +// CPUMetrics keeps stats for the CPU usage +type CPUMetrics struct { + Usage CPUUsage +} + +// BlkIOEntry describes an entry in the blkio stats +type BlkIOEntry struct { + Op string + Major uint64 + Minor uint64 + Value uint64 +} + +// BlkioMetrics keeps usage stats for the blkio cgroup controller +type BlkioMetrics struct { + IoServiceBytesRecursive []BlkIOEntry +} + +// MemoryMetrics keeps usage stats for the memory cgroup controller +type MemoryMetrics struct { + Usage MemoryUsage +} + +// PidsMetrics keeps usage stats for the pids cgroup controller +type PidsMetrics struct { + Current uint64 +} + +// Metrics keeps usage stats for the cgroup controllers +type Metrics struct { + CPU CPUMetrics + Blkio BlkioMetrics + Memory MemoryMetrics + Pids PidsMetrics +} + +type controller struct { + name string + symlink bool +} + +type controllerHandler interface { + Create(*CgroupControl) (bool, error) + Apply(*CgroupControl, *spec.LinuxResources) error + Destroy(*CgroupControl) error + Stat(*CgroupControl, *Metrics) error +} + +const ( + cgroupRoot = "/sys/fs/cgroup" + _cgroup2SuperMagic = 0x63677270 + // CPU is the cpu controller + CPU = "cpu" + // CPUAcct is the cpuacct controller + CPUAcct = "cpuacct" + // CPUset is the cpuset controller + CPUset = "cpuset" + // Memory is the memory controller + Memory = "memory" + // Pids is the pids controller + Pids = "pids" + // Blkio is the blkio controller + Blkio = "blkio" +) + +var handlers map[string]controllerHandler + +func init() { + handlers = make(map[string]controllerHandler) + handlers[CPU] = getCPUHandler() + handlers[CPUset] = getCpusetHandler() + handlers[Memory] = getMemoryHandler() + handlers[Pids] = getPidsHandler() + handlers[Blkio] = getBlkioHandler() +} + +// getAvailableControllers get the available controllers +func getAvailableControllers(exclude map[string]controllerHandler, cgroup2 bool) ([]controller, error) { + if cgroup2 { + return nil, fmt.Errorf("function not implemented yet") + } + + infos, err := ioutil.ReadDir(cgroupRoot) + if err != nil { + return nil, err + } + var controllers []controller + for _, i := range infos { + name := i.Name() + if _, found := exclude[name]; found { + continue + } + c := controller{ + name: name, + symlink: !i.IsDir(), + } + controllers = append(controllers, c) + } + return controllers, nil +} + +// getCgroupv1Path is a helper function to get the cgroup v1 path +func (c *CgroupControl) getCgroupv1Path(name string) string { + return filepath.Join(cgroupRoot, name, c.path) +} + +// initialize initializes the specified hierarchy +func (c *CgroupControl) initialize() (err error) { + createdSoFar := map[string]controllerHandler{} + defer func() { + if err != nil { + for name, ctr := range createdSoFar { + if err := ctr.Destroy(c); err != nil { + logrus.Warningf("error cleaning up controller %s for %s", name, c.path) + } + } + } + }() + for name, handler := range handlers { + created, err := handler.Create(c) + if err != nil { + return err + } + if created { + createdSoFar[name] = handler + } + } + + if !c.cgroup2 { + // We won't need to do this for cgroup v2 + for _, ctr := range c.additionalControllers { + if ctr.symlink { + continue + } + path := c.getCgroupv1Path(ctr.name) + if err := os.MkdirAll(path, 0755); err != nil { + return errors.Wrapf(err, "error creating cgroup path %s for %s", path, ctr.name) + } + } + } + + return nil +} + +func (c *CgroupControl) createCgroupDirectory(controller string) (bool, error) { + cPath := c.getCgroupv1Path(controller) + _, err := os.Stat(cPath) + if err == nil { + return false, nil + } + + if !os.IsNotExist(err) { + return false, err + } + + if err := os.MkdirAll(cPath, 0755); err != nil { + return false, errors.Wrapf(err, "error creating cgroup for %s", controller) + } + return true, nil +} + +func readFileAsUint64(path string) (uint64, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return 0, err + } + return strconv.ParseUint(cleanString(string(data)), 10, 0) +} + +func (c *CgroupControl) writePidToTasks(pid int, name string) error { + path := filepath.Join(c.getCgroupv1Path(name), "tasks") + payload := []byte(fmt.Sprintf("%d", pid)) + return ioutil.WriteFile(path, payload, 0644) +} + +// New creates a new cgroup control +func New(path string, resources *spec.LinuxResources) (*CgroupControl, error) { + cgroup2, err := IsCgroup2UnifiedMode() + if err != nil { + return nil, err + } + control := &CgroupControl{ + cgroup2: cgroup2, + path: path, + } + + if !cgroup2 { + controllers, err := getAvailableControllers(handlers, false) + if err != nil { + return nil, err + } + control.additionalControllers = controllers + } + + if err := control.initialize(); err != nil { + return nil, err + } + + return control, nil +} + +// NewSystemd creates a new cgroup control +func NewSystemd(path string) (*CgroupControl, error) { + cgroup2, err := IsCgroup2UnifiedMode() + if err != nil { + return nil, err + } + control := &CgroupControl{ + cgroup2: cgroup2, + path: path, + systemd: true, + } + return control, nil +} + +// Load loads an existing cgroup control +func Load(path string) (*CgroupControl, error) { + cgroup2, err := IsCgroup2UnifiedMode() + if err != nil { + return nil, err + } + control := &CgroupControl{ + cgroup2: cgroup2, + path: path, + systemd: false, + } + if !cgroup2 { + for name := range handlers { + p := control.getCgroupv1Path(name) + if _, err := os.Stat(p); err != nil { + if os.IsNotExist(err) { + // compatible with the error code + // used by containerd/cgroups + return nil, ErrCgroupDeleted + } + } + } + } + return control, nil +} + +// CreateSystemdUnit creates the systemd cgroup +func (c *CgroupControl) CreateSystemdUnit(path string) error { + if !c.systemd { + return fmt.Errorf("the cgroup controller is not using systemd") + } + return systemdCreate(path) +} + +// Delete cleans a cgroup +func (c *CgroupControl) Delete() error { + return c.DeleteByPath(c.path) +} + +// DeleteByPath deletes the specified cgroup path +func (c *CgroupControl) DeleteByPath(path string) error { + if c.systemd { + return systemdDestroy(path) + } + var lastError error + for _, h := range handlers { + if err := h.Destroy(c); err != nil { + lastError = err + } + } + + for _, ctr := range c.additionalControllers { + if err := os.Remove(c.getCgroupv1Path(ctr.name)); err != nil { + lastError = err + } + } + return lastError +} + +// Update updates the cgroups +func (c *CgroupControl) Update(resources *spec.LinuxResources) error { + for _, h := range handlers { + if err := h.Apply(c, resources); err != nil { + return err + } + } + return nil +} + +// AddPid moves the specified pid to the cgroup +func (c *CgroupControl) AddPid(pid int) error { + if c.cgroup2 { + return fmt.Errorf("function not implemented yet") + } + pidString := []byte(fmt.Sprintf("%d\n", pid)) + + var names []string + for n := range handlers { + names = append(names, n) + } + + for _, c := range c.additionalControllers { + if !c.symlink { + names = append(names, c.name) + } + } + + for _, n := range names { + p := filepath.Join(c.getCgroupv1Path(n), "tasks") + if err := ioutil.WriteFile(p, pidString, 0644); err != nil { + return err + } + } + return nil +} + +// Stat returns usage statistics for the cgroup +func (c *CgroupControl) Stat() (*Metrics, error) { + if c.cgroup2 { + return nil, fmt.Errorf("function not implemented yet") + } + m := Metrics{} + for _, h := range handlers { + if err := h.Stat(c, &m); err != nil { + return nil, err + } + } + return &m, nil +} diff --git a/pkg/cgroups/cgroups_supported.go b/pkg/cgroups/cgroups_supported.go new file mode 100644 index 000000000..fcd44dfc8 --- /dev/null +++ b/pkg/cgroups/cgroups_supported.go @@ -0,0 +1,27 @@ +// +build linux + +package cgroups + +import ( + "sync" + "syscall" +) + +var ( + isUnifiedOnce sync.Once + isUnified bool + isUnifiedErr error +) + +// IsCgroup2UnifiedMode returns whether we are running in cgroup 2 cgroup2 mode. +func IsCgroup2UnifiedMode() (bool, error) { + isUnifiedOnce.Do(func() { + var st syscall.Statfs_t + if err := syscall.Statfs("/sys/fs/cgroup", &st); err != nil { + isUnified, isUnifiedErr = false, err + } else { + isUnified, isUnifiedErr = st.Type == _cgroup2SuperMagic, nil + } + }) + return isUnified, isUnifiedErr +} diff --git a/pkg/cgroups/cgroups_unsupported.go b/pkg/cgroups/cgroups_unsupported.go new file mode 100644 index 000000000..9dc196e42 --- /dev/null +++ b/pkg/cgroups/cgroups_unsupported.go @@ -0,0 +1,8 @@ +// +build !linux + +package cgroups + +// IsCgroup2UnifiedMode returns whether we are running in cgroup 2 cgroup2 mode. +func IsCgroup2UnifiedMode() (bool, error) { + return false, nil +} diff --git a/pkg/cgroups/cpu.go b/pkg/cgroups/cpu.go new file mode 100644 index 000000000..d27f1257f --- /dev/null +++ b/pkg/cgroups/cpu.go @@ -0,0 +1,97 @@ +package cgroups + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + spec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" +) + +type cpuHandler struct { +} + +func getCPUHandler() *cpuHandler { + return &cpuHandler{} +} + +func cleanString(s string) string { + return strings.Trim(s, "\n") +} + +func readAcct(ctr *CgroupControl, name string) (uint64, error) { + p := filepath.Join(ctr.getCgroupv1Path(CPUAcct), name) + return readFileAsUint64(p) +} + +func readAcctList(ctr *CgroupControl, name string) ([]uint64, error) { + var r []uint64 + + p := filepath.Join(ctr.getCgroupv1Path(CPUAcct), name) + data, err := ioutil.ReadFile(p) + if err != nil { + return nil, errors.Wrapf(err, "reading %s", p) + } + for _, s := range strings.Split(string(data), " ") { + s = cleanString(s) + if s == "" { + break + } + v, err := strconv.ParseUint(s, 10, 0) + if err != nil { + return nil, errors.Wrapf(err, "parsing %s", s) + } + r = append(r, v) + } + return r, nil +} + +// Apply set the specified constraints +func (c *cpuHandler) Apply(ctr *CgroupControl, res *spec.LinuxResources) error { + if res.CPU == nil { + return nil + } + return fmt.Errorf("function not implemented yet") +} + +// Create the cgroup +func (c *cpuHandler) Create(ctr *CgroupControl) (bool, error) { + if ctr.cgroup2 { + return false, fmt.Errorf("function not implemented yet") + } + return ctr.createCgroupDirectory(CPU) +} + +// Destroy the cgroup +func (c *cpuHandler) Destroy(ctr *CgroupControl) error { + return os.Remove(ctr.getCgroupv1Path(CPU)) +} + +// Stat fills a metrics structure with usage stats for the controller +func (c *cpuHandler) Stat(ctr *CgroupControl, m *Metrics) error { + if ctr.cgroup2 { + return fmt.Errorf("function not implemented yet") + } + + var err error + usage := CPUUsage{} + + usage.Total, err = readAcct(ctr, "cpuacct.usage") + if err != nil { + return err + } + usage.Kernel, err = readAcct(ctr, "cpuacct.usage_sys") + if err != nil { + return err + } + usage.PerCPU, err = readAcctList(ctr, "cpuacct.usage_percpu") + if err != nil { + return err + } + m.CPU = CPUMetrics{Usage: usage} + return nil +} diff --git a/pkg/cgroups/cpuset.go b/pkg/cgroups/cpuset.go new file mode 100644 index 000000000..15c649e46 --- /dev/null +++ b/pkg/cgroups/cpuset.go @@ -0,0 +1,80 @@ +package cgroups + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + + spec "github.com/opencontainers/runtime-spec/specs-go" +) + +type cpusetHandler struct { +} + +func cpusetCopyFileFromParent(dir, file string) ([]byte, error) { + if dir == cgroupRoot { + return nil, fmt.Errorf("could not find parent to initialize cpuset %s", file) + } + path := filepath.Join(dir, file) + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + if len(strings.Trim(string(data), "\n")) != 0 { + return data, nil + } + data, err = cpusetCopyFileFromParent(filepath.Dir(dir), file) + if err != nil { + return nil, err + } + if err := ioutil.WriteFile(path, data, 0644); err != nil { + return nil, err + } + return data, nil +} + +func cpusetCopyFromParent(path string) error { + for _, file := range []string{"cpuset.cpus", "cpuset.mems"} { + if _, err := cpusetCopyFileFromParent(path, file); err != nil { + return err + } + } + return nil +} + +func getCpusetHandler() *cpusetHandler { + return &cpusetHandler{} +} + +// Apply set the specified constraints +func (c *cpusetHandler) Apply(ctr *CgroupControl, res *spec.LinuxResources) error { + if res.CPU == nil { + return nil + } + return fmt.Errorf("function not implemented yet") +} + +// Create the cgroup +func (c *cpusetHandler) Create(ctr *CgroupControl) (bool, error) { + if ctr.cgroup2 { + return false, fmt.Errorf("function not implemented yet") + } + + created, err := ctr.createCgroupDirectory(CPUset) + if !created || err != nil { + return created, err + } + return true, cpusetCopyFromParent(ctr.getCgroupv1Path(CPUset)) +} + +// Destroy the cgroup +func (c *cpusetHandler) Destroy(ctr *CgroupControl) error { + return os.Remove(ctr.getCgroupv1Path(CPUset)) +} + +// Stat fills a metrics structure with usage stats for the controller +func (c *cpusetHandler) Stat(ctr *CgroupControl, m *Metrics) error { + return nil +} diff --git a/pkg/cgroups/memory.go b/pkg/cgroups/memory.go new file mode 100644 index 000000000..2224e4f1e --- /dev/null +++ b/pkg/cgroups/memory.go @@ -0,0 +1,60 @@ +package cgroups + +import ( + "fmt" + "os" + "path/filepath" + + spec "github.com/opencontainers/runtime-spec/specs-go" +) + +type memHandler struct { +} + +func getMemoryHandler() *memHandler { + return &memHandler{} +} + +// Apply set the specified constraints +func (c *memHandler) Apply(ctr *CgroupControl, res *spec.LinuxResources) error { + if res.Memory == nil { + return nil + } + return fmt.Errorf("function not implemented yet") +} + +// Create the cgroup +func (c *memHandler) Create(ctr *CgroupControl) (bool, error) { + if ctr.cgroup2 { + return false, fmt.Errorf("function not implemented yet") + } + return ctr.createCgroupDirectory(Memory) +} + +// Destroy the cgroup +func (c *memHandler) Destroy(ctr *CgroupControl) error { + return os.Remove(ctr.getCgroupv1Path(Memory)) +} + +// Stat fills a metrics structure with usage stats for the controller +func (c *memHandler) Stat(ctr *CgroupControl, m *Metrics) error { + if ctr.cgroup2 { + return fmt.Errorf("function not implemented yet") + } + usage := MemoryUsage{} + + memoryRoot := ctr.getCgroupv1Path(Memory) + + var err error + usage.Usage, err = readFileAsUint64(filepath.Join(memoryRoot, "memory.usage_in_bytes")) + if err != nil { + return err + } + usage.Limit, err = readFileAsUint64(filepath.Join(memoryRoot, "memory.limit_in_bytes")) + if err != nil { + return err + } + + m.Memory = MemoryMetrics{Usage: usage} + return nil +} diff --git a/pkg/cgroups/pids.go b/pkg/cgroups/pids.go new file mode 100644 index 000000000..f37ac7611 --- /dev/null +++ b/pkg/cgroups/pids.go @@ -0,0 +1,62 @@ +package cgroups + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + + spec "github.com/opencontainers/runtime-spec/specs-go" +) + +type pidHandler struct { +} + +func getPidsHandler() *pidHandler { + return &pidHandler{} +} + +// Apply set the specified constraints +func (c *pidHandler) Apply(ctr *CgroupControl, res *spec.LinuxResources) error { + if res.Pids == nil { + return nil + } + if ctr.cgroup2 { + return fmt.Errorf("function not implemented yet") + } + + p := filepath.Join(ctr.getCgroupv1Path(Pids), "pids.max") + return ioutil.WriteFile(p, []byte(fmt.Sprintf("%d\n", res.Pids.Limit)), 0644) +} + +// Create the cgroup +func (c *pidHandler) Create(ctr *CgroupControl) (bool, error) { + if ctr.cgroup2 { + return false, fmt.Errorf("function not implemented yet") + } + return ctr.createCgroupDirectory(Pids) +} + +// Destroy the cgroup +func (c *pidHandler) Destroy(ctr *CgroupControl) error { + return os.Remove(ctr.getCgroupv1Path(Pids)) +} + +// Stat fills a metrics structure with usage stats for the controller +func (c *pidHandler) Stat(ctr *CgroupControl, m *Metrics) error { + var PIDRoot string + + if ctr.cgroup2 { + return fmt.Errorf("function not implemented yet") + } + + PIDRoot := ctr.getCgroupv1Path(Pids) + + current, err := readFileAsUint64(filepath.Join(PIDRoot, "pids.current")) + if err != nil { + return err + } + + m.Pids = PidsMetrics{Current: current} + return nil +} diff --git a/pkg/cgroups/systemd.go b/pkg/cgroups/systemd.go new file mode 100644 index 000000000..e72e456bc --- /dev/null +++ b/pkg/cgroups/systemd.go @@ -0,0 +1,92 @@ +package cgroups + +import ( + "fmt" + "path/filepath" + "strings" + + systemdDbus "github.com/coreos/go-systemd/dbus" + "github.com/godbus/dbus" +) + +func systemdCreate(path string) error { + c, err := systemdDbus.New() + if err != nil { + return err + } + defer c.Close() + + slice, name := filepath.Split(path) + slice = strings.TrimSuffix(slice, "/") + + var lastError error + for i := 0; i < 2; i++ { + properties := []systemdDbus.Property{ + systemdDbus.PropDescription(fmt.Sprintf("cgroup %s", name)), + systemdDbus.PropWants(slice), + } + pMap := map[string]bool{ + "DefaultDependencies": false, + "MemoryAccounting": true, + "CPUAccounting": true, + "BlockIOAccounting": true, + } + if i == 0 { + pMap["Delegate"] = true + } + for k, v := range pMap { + p := systemdDbus.Property{ + Name: k, + Value: dbus.MakeVariant(v), + } + properties = append(properties, p) + } + + ch := make(chan string) + _, err = c.StartTransientUnit(name, "replace", properties, ch) + if err != nil { + lastError = err + continue + } + <-ch + return nil + } + return lastError +} + +/* + systemdDestroy is copied from containerd/cgroups/systemd.go file, that + has the following license: + + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +func systemdDestroy(path string) error { + c, err := systemdDbus.New() + if err != nil { + return err + } + defer c.Close() + + name := filepath.Base(path) + + ch := make(chan string) + _, err = c.StopUnit(name, "replace", ch) + if err != nil { + return err + } + <-ch + return nil +} |