From 2a0c949b9bab88f4e05d39b5e6d7db62bb39df11 Mon Sep 17 00:00:00 2001 From: Matthew Heon Date: Fri, 23 Feb 2018 15:28:56 -0500 Subject: Add tracking for container exec sessions to DB Signed-off-by: Matthew Heon Closes: #412 Approved by: baude --- libpod/boltdb_state.go | 3 + libpod/container.go | 26 +++++++- libpod/container_ffjson.go | 147 ++++++++++++++++++++++++++++++++++++++++++- libpod/sql_state.go | 29 +++++++-- libpod/sql_state_internal.go | 23 +++++-- libpod/test_common.go | 1 + 6 files changed, 217 insertions(+), 12 deletions(-) diff --git a/libpod/boltdb_state.go b/libpod/boltdb_state.go index 8f6bd579f..f5d0afc4a 100644 --- a/libpod/boltdb_state.go +++ b/libpod/boltdb_state.go @@ -140,6 +140,9 @@ func (s *BoltState) Refresh() error { state.Mountpoint = "" state.Mounted = false state.State = ContainerStateConfigured + state.IPAddress = "" + state.SubnetMask = "" + state.ExecSessions = make(map[string]int) newStateBytes, err := json.Marshal(state) if err != nil { diff --git a/libpod/container.go b/libpod/container.go index 42f13a992..83a45d379 100644 --- a/libpod/container.go +++ b/libpod/container.go @@ -141,6 +141,9 @@ type containerState struct { IPAddress string `json:"ipAddress"` // Subnet mask of container (if network namespace was created) SubnetMask string `json:"subnetMask"` + // ExecSessions contains active exec sessions for container + // Exec session ID is mapped to PID of exec process + ExecSessions map[string]int `json:"execSessions,omitempty"` } // ContainerConfig contains all information that was used to create the @@ -574,7 +577,8 @@ func (c *Container) OOMKilled() (bool, error) { } // PID returns the PID of the container -// An error is returned if the container is not running +// If the container is not running, a pid of 0 will be returned. No error will +// occur. func (c *Container) PID() (int, error) { if !c.locked { c.lock.Lock() @@ -588,6 +592,26 @@ func (c *Container) PID() (int, error) { return c.state.PID, nil } +// ExecSessions retrieves active exec sessions running in the container +// The result is a map from session ID to the PID of the exec process +func (c *Container) ExecSessions() (map[string]int, error) { + if !c.locked { + c.lock.Lock() + defer c.lock.Unlock() + + if err := c.syncContainer(); err != nil { + return nil, err + } + } + + returnMap := make(map[string]int, len(c.state.ExecSessions)) + for k, v := range c.state.ExecSessions { + returnMap[k] = v + } + + return returnMap, nil +} + // Misc Accessors // Most will require locking diff --git a/libpod/container_ffjson.go b/libpod/container_ffjson.go index 93a3bee20..077e0a635 100644 --- a/libpod/container_ffjson.go +++ b/libpod/container_ffjson.go @@ -2439,7 +2439,7 @@ func (j *containerState) MarshalJSONBuf(buf fflib.EncodingBuffer) error { var obj []byte _ = obj _ = err - buf.WriteString(`{"state":`) + buf.WriteString(`{ "state":`) fflib.FormatBits2(buf, uint64(j.State), 10, j.State < 0) buf.WriteByte(',') if len(j.ConfigPath) != 0 { @@ -2515,6 +2515,24 @@ func (j *containerState) MarshalJSONBuf(buf fflib.EncodingBuffer) error { fflib.WriteJsonString(buf, string(j.IPAddress)) buf.WriteString(`,"subnetMask":`) fflib.WriteJsonString(buf, string(j.SubnetMask)) + buf.WriteByte(',') + if len(j.ExecSessions) != 0 { + if j.ExecSessions == nil { + buf.WriteString(`"execSessions":null`) + } else { + buf.WriteString(`"execSessions":{ `) + for key, value := range j.ExecSessions { + fflib.WriteJsonString(buf, key) + buf.WriteString(`:`) + fflib.FormatBits2(buf, uint64(value), 10, value < 0) + buf.WriteByte(',') + } + buf.Rewind(1) + buf.WriteByte('}') + } + buf.WriteByte(',') + } + buf.Rewind(1) buf.WriteByte('}') return nil } @@ -2546,6 +2564,8 @@ const ( ffjtcontainerStateIPAddress ffjtcontainerStateSubnetMask + + ffjtcontainerStateExecSessions ) var ffjKeycontainerStateState = []byte("state") @@ -2572,6 +2592,8 @@ var ffjKeycontainerStateIPAddress = []byte("ipAddress") var ffjKeycontainerStateSubnetMask = []byte("subnetMask") +var ffjKeycontainerStateExecSessions = []byte("execSessions") + // UnmarshalJSON umarshall json - template of ffjson func (j *containerState) UnmarshalJSON(input []byte) error { fs := fflib.NewFFLexer(input) @@ -2647,6 +2669,11 @@ mainparse: currentKey = ffjtcontainerStateExitCode state = fflib.FFParse_want_colon goto mainparse + + } else if bytes.Equal(ffjKeycontainerStateExecSessions, kn) { + currentKey = ffjtcontainerStateExecSessions + state = fflib.FFParse_want_colon + goto mainparse } case 'f': @@ -2722,6 +2749,12 @@ mainparse: } + if fflib.EqualFoldRight(ffjKeycontainerStateExecSessions, kn) { + currentKey = ffjtcontainerStateExecSessions + state = fflib.FFParse_want_colon + goto mainparse + } + if fflib.EqualFoldRight(ffjKeycontainerStateSubnetMask, kn) { currentKey = ffjtcontainerStateSubnetMask state = fflib.FFParse_want_colon @@ -2847,6 +2880,9 @@ mainparse: case ffjtcontainerStateSubnetMask: goto handle_SubnetMask + case ffjtcontainerStateExecSessions: + goto handle_ExecSessions + case ffjtcontainerStatenosuchkey: err = fs.SkipField(tok) if err != nil { @@ -3201,6 +3237,115 @@ handle_SubnetMask: state = fflib.FFParse_after_value goto mainparse +handle_ExecSessions: + + /* handler: j.ExecSessions type=map[string]int kind=map quoted=false*/ + + { + + { + if tok != fflib.FFTok_left_bracket && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for ", tok)) + } + } + + if tok == fflib.FFTok_null { + j.ExecSessions = nil + } else { + + j.ExecSessions = make(map[string]int, 0) + + wantVal := true + + for { + + var k string + + var tmpJExecSessions int + + tok = fs.Scan() + if tok == fflib.FFTok_error { + goto tokerror + } + if tok == fflib.FFTok_right_bracket { + break + } + + if tok == fflib.FFTok_comma { + if wantVal == true { + // TODO(pquerna): this isn't an ideal error message, this handles + // things like [,,,] as an array value. + return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok)) + } + continue + } else { + wantVal = true + } + + /* handler: k type=string kind=string quoted=false*/ + + { + + { + if tok != fflib.FFTok_string && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for string", tok)) + } + } + + if tok == fflib.FFTok_null { + + } else { + + outBuf := fs.Output.Bytes() + + k = string(string(outBuf)) + + } + } + + // Expect ':' after key + tok = fs.Scan() + if tok != fflib.FFTok_colon { + return fs.WrapErr(fmt.Errorf("wanted colon token, but got token: %v", tok)) + } + + tok = fs.Scan() + /* handler: tmpJExecSessions type=int kind=int quoted=false*/ + + { + if tok != fflib.FFTok_integer && tok != fflib.FFTok_null { + return fs.WrapErr(fmt.Errorf("cannot unmarshal %s into Go value for int", tok)) + } + } + + { + + if tok == fflib.FFTok_null { + + } else { + + tval, err := fflib.ParseInt(fs.Output.Bytes(), 10, 64) + + if err != nil { + return fs.WrapErr(err) + } + + tmpJExecSessions = int(tval) + + } + } + + j.ExecSessions[k] = tmpJExecSessions + + wantVal = false + } + + } + } + + state = fflib.FFParse_after_value + goto mainparse + wantedvalue: return fs.WrapErr(fmt.Errorf("wanted value token, but got token: %v", tok)) wrongtokenerror: diff --git a/libpod/sql_state.go b/libpod/sql_state.go index cf759d836..abf48543f 100644 --- a/libpod/sql_state.go +++ b/libpod/sql_state.go @@ -14,7 +14,7 @@ import ( // DBSchema is the current DB schema version // Increments every time a change is made to the database's tables -const DBSchema = 11 +const DBSchema = 12 // SQLState is a state implementation backed by a persistent SQLite3 database type SQLState struct { @@ -102,7 +102,8 @@ func (s *SQLState) Refresh() (err error) { Pid=?, NetNSPath=?, IPAddress=?, - SubnetMask=?;` + SubnetMask=?, + ExecSessions=?;` if !s.valid { return ErrDBClosed @@ -132,7 +133,8 @@ func (s *SQLState) Refresh() (err error) { 0, "", "", - "") + "", + "{}") if err != nil { return errors.Wrapf(err, "error refreshing database state") } @@ -269,7 +271,8 @@ func (s *SQLState) UpdateContainer(ctr *Container) error { Pid, NetNSPath, IPAddress, - SubnetMask + SubnetMask, + ExecSessions FROM containerState WHERE ID=?;` var ( @@ -285,6 +288,7 @@ func (s *SQLState) UpdateContainer(ctr *Container) error { netNSPath string ipAddress string subnetMask string + execSessions string ) if !s.valid { @@ -308,7 +312,8 @@ func (s *SQLState) UpdateContainer(ctr *Container) error { &pid, &netNSPath, &ipAddress, - &subnetMask) + &subnetMask, + &execSessions) if err != nil { // The container may not exist in the database if err == sql.ErrNoRows { @@ -333,6 +338,11 @@ func (s *SQLState) UpdateContainer(ctr *Container) error { newState.IPAddress = ipAddress newState.SubnetMask = subnetMask + newState.ExecSessions = make(map[string]int) + if err := json.Unmarshal([]byte(execSessions), &newState.ExecSessions); err != nil { + return errors.Wrapf(err, "error parsing container %s exec sessions", ctr.ID()) + } + if newState.Mountpoint != "" { newState.Mounted = true } @@ -395,13 +405,19 @@ func (s *SQLState) SaveContainer(ctr *Container) (err error) { Pid=?, NetNSPath=?, IPAddress=?, - SubnetMask=? + SubnetMask=?, + ExecSessions=? WHERE Id=?;` if !ctr.valid { return ErrCtrRemoved } + execSessionsJSON, err := json.Marshal(ctr.state.ExecSessions) + if err != nil { + return errors.Wrapf(err, "error marshalling container %s exec sessions", ctr.ID()) + } + netNSPath := "" if ctr.state.NetNS != nil { netNSPath = ctr.state.NetNS.Path() @@ -439,6 +455,7 @@ func (s *SQLState) SaveContainer(ctr *Container) (err error) { netNSPath, ctr.state.IPAddress, ctr.state.SubnetMask, + execSessionsJSON, ctr.ID()) if err != nil { return errors.Wrapf(err, "error updating container %s state in database", ctr.ID()) diff --git a/libpod/sql_state_internal.go b/libpod/sql_state_internal.go index d0771e0d8..6f0e2284c 100644 --- a/libpod/sql_state_internal.go +++ b/libpod/sql_state_internal.go @@ -33,7 +33,8 @@ const ( containerState.Pid, containerState.NetNSPath, containerState.IPAddress, - containerState.SubnetMask + containerState.SubnetMask, + containerState.ExecSessions FROM containers INNER JOIN containerState ON containers.Id = containerState.Id ` @@ -274,6 +275,7 @@ func prepareDB(db *sql.DB) (err error) { NetNSPath TEXT NOT NULL, IPAddress TEXT NOT NULL, SubnetMask TEXT NOT NULL, + ExecSessions TEXT NOT NULL, CHECK (State>0), CHECK (OomKilled IN (0, 1)), @@ -483,6 +485,7 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) { netNSPath string ipAddress string subnetMask string + execSessions string ) err := row.Scan( @@ -536,7 +539,8 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) { &pid, &netNSPath, &ipAddress, - &subnetMask) + &subnetMask, + &execSessions) if err != nil { if err == sql.ErrNoRows { return nil, ErrNoSuchCtr @@ -616,6 +620,11 @@ func (s *SQLState) ctrFromScannable(row scannable) (*Container, error) { return nil, errors.Wrapf(err, "error parsing container %s DNS server JSON", id) } + ctr.state.ExecSessions = make(map[string]int) + if err := json.Unmarshal([]byte(execSessions), &ctr.state.ExecSessions); err != nil { + return nil, errors.Wrapf(err, "error parsing container %s exec sessions JSON", id) + } + labels := make(map[string]string) if err := json.Unmarshal([]byte(labelsJSON), &labels); err != nil { return nil, errors.Wrapf(err, "error parsing container %s labels JSON", id) @@ -753,7 +762,7 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) { addCtrState = `INSERT INTO containerState VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, - ?, ?, ? + ?, ?, ?, ? );` addRegistry = "INSERT INTO registry VALUES (?, ?);" checkCtrInPod = "SELECT 1 FROM containers WHERE Id=? AND Pod=?;" @@ -795,6 +804,11 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) { return errors.Wrapf(err, "error marshaling container %s labels to JSON", ctr.ID()) } + execSessionsJSON, err := json.Marshal(ctr.state.ExecSessions) + if err != nil { + return errors.Wrapf(err, "error marshalling container %s exec sessions to JSON", ctr.ID()) + } + netNSPath := "" if ctr.state.NetNS != nil { netNSPath = ctr.state.NetNS.Path() @@ -918,7 +932,8 @@ func (s *SQLState) addContainer(ctr *Container, pod *Pod) (err error) { ctr.state.PID, netNSPath, ctr.state.IPAddress, - ctr.state.SubnetMask) + ctr.state.SubnetMask, + execSessionsJSON) if err != nil { return errors.Wrapf(err, "error adding container %s state to database", ctr.ID()) } diff --git a/libpod/test_common.go b/libpod/test_common.go index e4a684f87..b03212a5a 100644 --- a/libpod/test_common.go +++ b/libpod/test_common.go @@ -55,6 +55,7 @@ func getTestContainer(id, name, locksDir string) (*Container, error) { Mounted: true, Mountpoint: "/does/not/exist/tmp/" + id, PID: 1234, + ExecSessions: map[string]int{"abcd": 101, "ef01": 202}, }, valid: true, } -- cgit v1.2.3-54-g00ecf