diff --git a/api/api_test.go b/api/api_test.go index d85a15a..d64ca00 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/pkg/reexec" "github.com/docker/libnetwork" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" "github.com/docker/libnetwork/testutils" @@ -88,11 +89,13 @@ func i2sbL(i interface{}) []*sandboxResource { } func createTestNetwork(t *testing.T, network string) (libnetwork.NetworkController, libnetwork.Network) { + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) } - defer c.Stop() netOption := options.Generic{ netlabel.GenericData: options.Generic{ @@ -175,6 +178,9 @@ func TestJson(t *testing.T) { func TestCreateDeleteNetwork(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -249,6 +255,9 @@ func TestCreateDeleteNetwork(t *testing.T) { func TestGetNetworksAndEndpoints(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -518,6 +527,9 @@ func TestGetNetworksAndEndpoints(t *testing.T) { func TestProcGetServices(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -686,6 +698,7 @@ func TestProcGetService(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() ep1, err := nw.CreateEndpoint("db") if err != nil { t.Fatal(err) @@ -738,6 +751,8 @@ func TestProcPublishUnpublishService(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, _ := createTestNetwork(t, "network") + defer c.Stop() + vars := make(map[string]string) vbad, err := json.Marshal("bad service create data") @@ -870,6 +885,7 @@ func TestAttachDetachBackend(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() ep1, err := nw.CreateEndpoint("db") if err != nil { t.Fatal(err) @@ -994,6 +1010,9 @@ func TestAttachDetachBackend(t *testing.T) { } func TestDetectGetNetworksInvalidQueryComposition(t *testing.T) { + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1011,6 +1030,7 @@ func TestDetectGetEndpointsInvalidQueryComposition(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, _ := createTestNetwork(t, "network") + defer c.Stop() vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"} _, errRsp := procGetEndpoints(c, vars, nil) @@ -1023,6 +1043,7 @@ func TestDetectGetServicesInvalidQueryComposition(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, _ := createTestNetwork(t, "network") + defer c.Stop() vars := map[string]string{urlNwName: "network", urlEpName: "x", urlEpPID: "y"} _, errRsp := procGetServices(c, vars, nil) @@ -1040,6 +1061,8 @@ func TestFindNetworkUtil(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() + nid := nw.ID() _, errRsp := findNetwork(c, "", byName) @@ -1102,6 +1125,9 @@ func TestFindNetworkUtil(t *testing.T) { func TestCreateDeleteEndpoints(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1225,6 +1251,9 @@ func TestCreateDeleteEndpoints(t *testing.T) { func TestJoinLeave(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1382,6 +1411,8 @@ func TestFindEndpointUtilPanic(t *testing.T) { defer testutils.SetupTestOSContext(t)() defer checkPanic(t) c, nw := createTestNetwork(t, "network") + defer c.Stop() + nid := nw.ID() findEndpoint(c, nid, "", byID, -1) } @@ -1390,6 +1421,8 @@ func TestFindServiceUtilPanic(t *testing.T) { defer testutils.SetupTestOSContext(t)() defer checkPanic(t) c, _ := createTestNetwork(t, "network") + defer c.Stop() + findService(c, "random_service", -1) } @@ -1397,6 +1430,8 @@ func TestFindEndpointUtil(t *testing.T) { defer testutils.SetupTestOSContext(t)() c, nw := createTestNetwork(t, "network") + defer c.Stop() + nid := nw.ID() ep, err := nw.CreateEndpoint("secondEp", nil) @@ -1443,7 +1478,8 @@ func TestFindEndpointUtil(t *testing.T) { t.Fatalf("Unexepected failure: %v", errRsp) } - if ep0 != ep1 || ep0 != ep2 || ep0 != ep3 || ep0 != ep4 || ep0 != ep5 { + if ep0.ID() != ep1.ID() || ep0.ID() != ep2.ID() || + ep0.ID() != ep3.ID() || ep0.ID() != ep4.ID() || ep0.ID() != ep5.ID() { t.Fatalf("Diffenrent queries returned different endpoints") } @@ -1665,6 +1701,9 @@ func TestwriteJSON(t *testing.T) { func TestHttpHandlerUninit(t *testing.T) { defer testutils.SetupTestOSContext(t)() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1732,6 +1771,9 @@ func TestHttpHandlerBadBody(t *testing.T) { rsp := newWriter() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -1765,6 +1807,9 @@ func TestEndToEnd(t *testing.T) { rsp := newWriter() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) @@ -2213,6 +2258,9 @@ func TestEndToEndErrorMessage(t *testing.T) { rsp := newWriter() + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + c, err := libnetwork.New() if err != nil { t.Fatal(err) diff --git a/cmd/dnet/dnet.go b/cmd/dnet/dnet.go index 738081b..7bac7c7 100644 --- a/cmd/dnet/dnet.go +++ b/cmd/dnet/dnet.go @@ -22,10 +22,12 @@ import ( "github.com/docker/docker/pkg/reexec" "github.com/Sirupsen/logrus" + psignal "github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/term" "github.com/docker/libnetwork" "github.com/docker/libnetwork/api" "github.com/docker/libnetwork/config" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/options" @@ -76,6 +78,7 @@ func processConfig(cfg *config.Config) []config.Option { if cfg == nil { return options } + dn := "bridge" if strings.TrimSpace(cfg.Daemon.DefaultNetwork) != "" { dn = cfg.Daemon.DefaultNetwork @@ -91,12 +94,12 @@ func processConfig(cfg *config.Config) []config.Option { if cfg.Daemon.Labels != nil { options = append(options, config.OptionLabels(cfg.Daemon.Labels)) } - if strings.TrimSpace(cfg.GlobalStore.Client.Provider) != "" { - options = append(options, config.OptionKVProvider(cfg.GlobalStore.Client.Provider)) - } - if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" { - options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address)) + + if dcfg, ok := cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() { + options = append(options, config.OptionKVProvider(dcfg.Client.Provider)) + options = append(options, config.OptionKVProviderURL(dcfg.Client.Address)) } + dOptions, err := startDiscovery(&cfg.Cluster) if err != nil { logrus.Infof("Skipping discovery : %s", err.Error()) @@ -182,8 +185,9 @@ func createDefaultNetwork(c libnetwork.NetworkController) { genericOption[netlabel.GenericData] = map[string]interface{}{ "BridgeName": nw, } - networkOption := libnetwork.NetworkOptionGeneric(genericOption) - createOptions = append(createOptions, networkOption) + createOptions = append(createOptions, + libnetwork.NetworkOptionGeneric(genericOption), + libnetwork.NetworkOptionPersist(false)) } _, err := c.NewNetwork(d, nw, createOptions...) if err != nil { @@ -214,6 +218,7 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error { fmt.Println("Error starting dnetDaemon :", err) return err } + createDefaultNetwork(controller) httpHandler := api.NewHTTPHandler(controller) r := mux.NewRouter().StrictSlash(false) @@ -231,10 +236,21 @@ func (d *dnetConnection) dnetDaemon(cfgFile string) error { post.Methods("GET", "PUT", "POST", "DELETE").HandlerFunc(httpHandler) handleSignals(controller) + setupDumpStackTrap() return http.ListenAndServe(d.addr, r) } +func setupDumpStackTrap() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGUSR1) + go func() { + for range c { + psignal.DumpStacks() + } + }() +} + func handleSignals(controller libnetwork.NetworkController) { c := make(chan os.Signal, 1) signals := []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT} diff --git a/config/config.go b/config/config.go index 96c8dab..0121c06 100644 --- a/config/config.go +++ b/config/config.go @@ -7,19 +7,21 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/discovery" "github.com/docker/libkv/store" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/netlabel" ) // Config encapsulates configurations of various Libnetwork components type Config struct { - Daemon DaemonCfg - Cluster ClusterCfg - GlobalStore, LocalStore DatastoreCfg + Daemon DaemonCfg + Cluster ClusterCfg + Scopes map[string]*datastore.ScopeCfg } // DaemonCfg represents libnetwork core configuration type DaemonCfg struct { Debug bool + DataDir string DefaultNetwork string DefaultDriver string Labels []string @@ -34,26 +36,28 @@ type ClusterCfg struct { Heartbeat uint64 } -// DatastoreCfg represents Datastore configuration. -type DatastoreCfg struct { - Embedded bool - Client DatastoreClientCfg -} - -// DatastoreClientCfg represents Datastore Client-only mode configuration -type DatastoreClientCfg struct { - Provider string - Address string - Config *store.Config +// LoadDefaultScopes loads default scope configs for scopes which +// doesn't have explicit user specified configs. +func (c *Config) LoadDefaultScopes(dataDir string) { + for k, v := range datastore.DefaultScopes(dataDir) { + if _, ok := c.Scopes[k]; !ok { + c.Scopes[k] = v + } + } } // ParseConfig parses the libnetwork configuration file func ParseConfig(tomlCfgFile string) (*Config, error) { - var cfg Config - if _, err := toml.DecodeFile(tomlCfgFile, &cfg); err != nil { + cfg := &Config{ + Scopes: map[string]*datastore.ScopeCfg{}, + } + + if _, err := toml.DecodeFile(tomlCfgFile, cfg); err != nil { return nil, err } - return &cfg, nil + + cfg.LoadDefaultScopes(cfg.Daemon.DataDir) + return cfg, nil } // Option is a option setter function type used to pass varios configurations @@ -98,7 +102,10 @@ func OptionLabels(labels []string) Option { func OptionKVProvider(provider string) Option { return func(c *Config) { log.Infof("Option OptionKVProvider: %s", provider) - c.GlobalStore.Client.Provider = strings.TrimSpace(provider) + if _, ok := c.Scopes[datastore.GlobalScope]; !ok { + c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.GlobalScope].Client.Provider = strings.TrimSpace(provider) } } @@ -106,7 +113,10 @@ func OptionKVProvider(provider string) Option { func OptionKVProviderURL(url string) Option { return func(c *Config) { log.Infof("Option OptionKVProviderURL: %s", url) - c.GlobalStore.Client.Address = strings.TrimSpace(url) + if _, ok := c.Scopes[datastore.GlobalScope]; !ok { + c.Scopes[datastore.GlobalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.GlobalScope].Client.Address = strings.TrimSpace(url) } } @@ -124,6 +134,13 @@ func OptionDiscoveryAddress(address string) Option { } } +// OptionDataDir function returns an option setter for data folder +func OptionDataDir(dataDir string) Option { + return func(c *Config) { + c.Daemon.DataDir = dataDir + } +} + // ProcessOptions processes options and stores it in config func (c *Config) ProcessOptions(options ...Option) { for _, opt := range options { @@ -145,7 +162,10 @@ func IsValidName(name string) bool { func OptionLocalKVProvider(provider string) Option { return func(c *Config) { log.Infof("Option OptionLocalKVProvider: %s", provider) - c.LocalStore.Client.Provider = strings.TrimSpace(provider) + if _, ok := c.Scopes[datastore.LocalScope]; !ok { + c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.LocalScope].Client.Provider = strings.TrimSpace(provider) } } @@ -153,7 +173,10 @@ func OptionLocalKVProvider(provider string) Option { func OptionLocalKVProviderURL(url string) Option { return func(c *Config) { log.Infof("Option OptionLocalKVProviderURL: %s", url) - c.LocalStore.Client.Address = strings.TrimSpace(url) + if _, ok := c.Scopes[datastore.LocalScope]; !ok { + c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.LocalScope].Client.Address = strings.TrimSpace(url) } } @@ -161,6 +184,9 @@ func OptionLocalKVProviderURL(url string) Option { func OptionLocalKVProviderConfig(config *store.Config) Option { return func(c *Config) { log.Infof("Option OptionLocalKVProviderConfig: %v", config) - c.LocalStore.Client.Config = config + if _, ok := c.Scopes[datastore.LocalScope]; !ok { + c.Scopes[datastore.LocalScope] = &datastore.ScopeCfg{} + } + c.Scopes[datastore.LocalScope].Client.Config = config } } diff --git a/controller.go b/controller.go index 6fdac5e..dbe13a5 100644 --- a/controller.go +++ b/controller.go @@ -124,73 +124,71 @@ type ipamData struct { } type driverTable map[string]*driverData + +//type networkTable map[string]*network +//type endpointTable map[string]*endpoint type ipamTable map[string]*ipamData -type networkTable map[string]*network -type endpointTable map[string]*endpoint type sandboxTable map[string]*sandbox type controller struct { - id string - networks networkTable - drivers driverTable - ipamDrivers ipamTable - sandboxes sandboxTable - cfg *config.Config - globalStore, localStore datastore.DataStore - discovery hostdiscovery.HostDiscovery - extKeyListener net.Listener + id string + //networks networkTable + drivers driverTable + ipamDrivers ipamTable + sandboxes sandboxTable + cfg *config.Config + stores []datastore.DataStore + discovery hostdiscovery.HostDiscovery + extKeyListener net.Listener + watchCh chan *endpoint + unWatchCh chan *endpoint + svcDb map[string]svcMap sync.Mutex } // New creates a new instance of network controller. func New(cfgOptions ...config.Option) (NetworkController, error) { var cfg *config.Config + cfg = &config.Config{ + Daemon: config.DaemonCfg{ + DriverCfg: make(map[string]interface{}), + }, + Scopes: make(map[string]*datastore.ScopeCfg), + } + if len(cfgOptions) > 0 { - cfg = &config.Config{ - Daemon: config.DaemonCfg{ - DriverCfg: make(map[string]interface{}), - }, - } cfg.ProcessOptions(cfgOptions...) } + cfg.LoadDefaultScopes(cfg.Daemon.DataDir) + c := &controller{ id: stringid.GenerateRandomID(), cfg: cfg, - networks: networkTable{}, sandboxes: sandboxTable{}, drivers: driverTable{}, - ipamDrivers: ipamTable{}} - if err := initDrivers(c); err != nil { + ipamDrivers: ipamTable{}, + svcDb: make(map[string]svcMap), + } + + if err := c.initStores(); err != nil { return nil, err } - if cfg != nil { - if err := c.initGlobalStore(); err != nil { - // Failing to initalize datastore is a bad situation to be in. - // But it cannot fail creating the Controller - log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err) - } - if err := c.initLocalStore(); err != nil { - log.Debugf("Failed to Initialize LocalDatastore due to %v.", err) - } - } - - if err := initIpams(c, c.localStore, c.globalStore); err != nil { - return nil, err - } - - if cfg != nil { - if err := c.restoreFromGlobalStore(); err != nil { - log.Debugf("Failed to restore from global Datastore due to %v", err) - } + if cfg != nil && cfg.Cluster.Watcher != nil { if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil { // Failing to initalize discovery is a bad situation to be in. // But it cannot fail creating the Controller log.Debugf("Failed to Initialize Discovery : %v", err) } - if err := c.restoreFromLocalStore(); err != nil { - log.Debugf("Failed to restore from local Datastore due to %v", err) - } + } + + if err := initDrivers(c); err != nil { + return nil, err + } + + if err := initIpams(c, c.getStore(datastore.LocalScope), + c.getStore(datastore.GlobalScope)); err != nil { + return nil, err } if err := c.startExternalKeyListener(); err != nil { @@ -325,15 +323,6 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti if !config.IsValidName(name) { return nil, ErrInvalidName(name) } - // Check if a network already exists with the specified network name - c.Lock() - for _, n := range c.networks { - if n.name == name { - c.Unlock() - return nil, NetworkNameError(name) - } - } - c.Unlock() // Construct the network object network := &network{ @@ -342,13 +331,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti ipamType: ipamapi.DefaultIPAM, id: stringid.GenerateRandomID(), ctrlr: c, - endpoints: endpointTable{}, persist: true, + drvOnce: &sync.Once{}, } network.processOptions(options...) - if _, err := c.loadNetworkDriver(network); err != nil { + // Make sure we have a driver available for this network type + // before we allocate anything. + if _, err := network.driver(); err != nil { return nil, err } @@ -364,7 +355,16 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti } }() - if err = c.addNetwork(network); err != nil { + // addNetwork can be called for local scope network lazily when + // an endpoint is created after a restart and the network was + // created in previous life. Make sure you wrap around the driver + // notification of network creation in once call so that the driver + // invoked only once in case both the network and endpoint creation + // happens in the same lifetime. + network.drvOnce.Do(func() { + err = c.addNetwork(network) + }) + if err != nil { return nil, err } @@ -380,35 +380,28 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti } func (c *controller) addNetwork(n *network) error { - if _, err := c.loadNetworkDriver(n); err != nil { + d, err := n.driver() + if err != nil { return err } - n.Lock() - d := n.driver - n.Unlock() // Create the network if err := d.CreateNetwork(n.id, n.generic, n.getIPv4Data(), n.getIPv6Data()); err != nil { return err } - if n.isGlobalScoped() { - if err := n.watchEndpoints(); err != nil { - return err - } - } - c.Lock() - c.networks[n.id] = n - c.Unlock() return nil } func (c *controller) Networks() []Network { - c.Lock() - defer c.Unlock() + var list []Network - list := make([]Network, 0, len(c.networks)) - for _, n := range c.networks { + networks, err := c.getNetworksFromStore() + if err != nil { + log.Error(err) + } + + for _, n := range networks { list = append(list, n) } @@ -450,12 +443,13 @@ func (c *controller) NetworkByID(id string) (Network, error) { if id == "" { return nil, ErrInvalidID(id) } - c.Lock() - defer c.Unlock() - if n, ok := c.networks[id]; ok { - return n, nil + + n, err := c.getNetworkFromStore(id) + if err != nil { + return nil, ErrNoSuchNetwork(id) } - return nil, ErrNoSuchNetwork(id) + + return n, nil } // NewSandbox creates a new sandbox for the passed container id @@ -620,30 +614,7 @@ func (c *controller) getIpamDriver(name string) (ipamapi.Ipam, error) { } func (c *controller) Stop() { - if c.localStore != nil { - c.localStore.KVStore().Close() - } + c.closeStores() c.stopExternalKeyListener() osl.GC() } - -func (c *controller) loadNetworkDriver(n *network) (driverapi.Driver, error) { - // Check if a driver for the specified network type is available - c.Lock() - dd, ok := c.drivers[n.networkType] - c.Unlock() - if !ok { - var err error - dd, err = c.loadDriver(n.networkType) - if err != nil { - return nil, err - } - } - - n.Lock() - n.svcRecords = svcMap{} - n.driver = dd.driver - n.dataScope = dd.capability.DataScope - n.Unlock() - return dd.driver, nil -} diff --git a/driverapi/driverapi.go b/driverapi/driverapi.go index c81e2db..bd311d0 100644 --- a/driverapi/driverapi.go +++ b/driverapi/driverapi.go @@ -1,10 +1,6 @@ package driverapi -import ( - "net" - - "github.com/docker/libnetwork/datastore" -) +import "net" // NetworkPluginEndpointType represents the Endpoint Type used by Plugin system const NetworkPluginEndpointType = "NetworkDriver" @@ -105,7 +101,7 @@ type DriverCallback interface { // Capability represents the high level capabilities of the drivers which libnetwork can make use of type Capability struct { - DataScope datastore.DataScope + DataScope string } // DiscoveryType represents the type of discovery element the DiscoverNew function is invoked on diff --git a/drivers.go b/drivers.go index b50d089..d87ec1f 100644 --- a/drivers.go +++ b/drivers.go @@ -3,6 +3,7 @@ package libnetwork import ( "strings" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/ipamapi" builtinIpam "github.com/docker/libnetwork/ipams/builtin" @@ -32,9 +33,9 @@ func makeDriverConfig(c *controller, ntype string) map[string]interface{} { config := make(map[string]interface{}) - if c.validateGlobalStoreConfig() { - config[netlabel.KVProvider] = c.cfg.GlobalStore.Client.Provider - config[netlabel.KVProviderURL] = c.cfg.GlobalStore.Client.Address + if dcfg, ok := c.cfg.Scopes[datastore.GlobalScope]; ok && dcfg.IsValid() { + config[netlabel.KVProvider] = dcfg.Client.Provider + config[netlabel.KVProviderURL] = dcfg.Client.Address } for _, label := range c.cfg.Daemon.Labels { diff --git a/drivers/overlay/ov_endpoint.go b/drivers/overlay/ov_endpoint.go index e3c1b88..7a861a9 100644 --- a/drivers/overlay/ov_endpoint.go +++ b/drivers/overlay/ov_endpoint.go @@ -43,12 +43,16 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo, return err } + // Since we perform lazy configuration make sure we try + // configuring the driver when we enter CreateEndpoint since + // CreateNetwork may not be called in every node. + if err := d.configure(); err != nil { + return err + } + n := d.network(nid) if n == nil { - n, err = d.createNetworkfromStore(nid) - if err != nil { - return fmt.Errorf("network id %q not found", nid) - } + return fmt.Errorf("network id %q not found", nid) } ep := &endpoint{ diff --git a/drivers/overlay/ov_network.go b/drivers/overlay/ov_network.go index cb73c9d..fa5acd2 100644 --- a/drivers/overlay/ov_network.go +++ b/drivers/overlay/ov_network.go @@ -45,12 +45,13 @@ type network struct { } func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error { - var err error if id == "" { return fmt.Errorf("invalid network id") } - if err = d.configure(); err != nil { + // Since we perform lazy configuration make sure we try + // configuring the driver when we enter CreateNetwork + if err := d.configure(); err != nil { return err } @@ -71,29 +72,16 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Dat n.subnets = append(n.subnets, s) } - for { - // If the datastore has the network object already - // there is no need to do a write. - err = d.store.GetObject(datastore.Key(n.Key()...), n) - if err == nil || err != datastore.ErrKeyNotFound { - break - } - - err = n.writeToStore() - if err == nil || err != datastore.ErrKeyModified { - break - } - } - - if err != nil { + if err := n.writeToStore(); err != nil { return fmt.Errorf("failed to update data store for network %v: %v", n.id, err) } + d.addNetwork(n) return nil } -func (d *driver) createNetworkfromStore(nid string) (*network, error) { +/* func (d *driver) createNetworkfromStore(nid string) (*network, error) { n := &network{ id: nid, driver: d, @@ -107,7 +95,7 @@ func (d *driver) createNetworkfromStore(nid string) (*network, error) { return nil, fmt.Errorf("unable to get network %q from data store, %v", nid, err) } return n, nil -} +}*/ func (d *driver) DeleteNetwork(nid string) error { if nid == "" { @@ -313,9 +301,34 @@ func (d *driver) deleteNetwork(nid string) { func (d *driver) network(nid string) *network { d.Lock() - defer d.Unlock() + networks := d.networks + d.Unlock() - return d.networks[nid] + n, ok := networks[nid] + if !ok { + n = d.getNetworkFromStore(nid) + if n != nil { + n.driver = d + n.endpoints = endpointTable{} + n.once = &sync.Once{} + networks[nid] = n + } + } + + return n +} + +func (d *driver) getNetworkFromStore(nid string) *network { + if d.store == nil { + return nil + } + + n := &network{id: nid} + if err := d.store.GetObject(datastore.Key(n.Key()...), n); err != nil { + return nil + } + + return n } func (n *network) sandbox() osl.Sandbox { @@ -408,30 +421,23 @@ func (n *network) SetValue(value []byte) error { subnetIP, _ := types.ParseCIDR(subnetIPstr) gwIP, _ := types.ParseCIDR(gwIPstr) - // If the network is being created by reading from the - // datastore subnets have to created. If the network - // already exists update only the subnets' vni field - if len(n.subnets) == 0 { - s := &subnet{ - subnetIP: subnetIP, - gwIP: gwIP, - vni: vni, - once: &sync.Once{}, - } - n.subnets = append(n.subnets, s) - return nil + s := &subnet{ + subnetIP: subnetIP, + gwIP: gwIP, + vni: vni, + once: &sync.Once{}, } + n.subnets = append(n.subnets, s) sNet := n.getMatchingSubnet(subnetIP) if sNet != nil { - if vni != 0 { - sNet.vni = vni - } + sNet.vni = vni } + return nil } -func (n *network) DataScope() datastore.DataScope { +func (n *network) DataScope() string { return datastore.GlobalScope } diff --git a/drivers/overlay/overlay.go b/drivers/overlay/overlay.go index 995b4f1..86a79f3 100644 --- a/drivers/overlay/overlay.go +++ b/drivers/overlay/overlay.go @@ -6,7 +6,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/idm" @@ -84,8 +83,8 @@ func (d *driver) configure() error { provURL, urlOk := d.config[netlabel.KVProviderURL] if provOk && urlOk { - cfg := &config.DatastoreCfg{ - Client: config.DatastoreClientCfg{ + cfg := &datastore.ScopeCfg{ + Client: datastore.ScopeClientCfg{ Provider: provider.(string), Address: provURL.(string), }, @@ -94,7 +93,7 @@ func (d *driver) configure() error { if confOk { cfg.Client.Config = provConfig.(*store.Config) } - d.store, err = datastore.NewDataStore(cfg) + d.store, err = datastore.NewDataStore(datastore.GlobalScope, cfg) if err != nil { err = fmt.Errorf("failed to initialize data store: %v", err) return diff --git a/endpoint.go b/endpoint.go index bc398f7..a76cb23 100644 --- a/endpoint.go +++ b/endpoint.go @@ -12,6 +12,7 @@ import ( "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/options" "github.com/docker/libnetwork/types" ) @@ -107,6 +108,37 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { return nil } +func (ep *endpoint) New() datastore.KVObject { + return &endpoint{network: ep.getNetwork()} +} + +func (ep *endpoint) CopyTo(o datastore.KVObject) error { + ep.Lock() + defer ep.Unlock() + + dstEp := o.(*endpoint) + dstEp.name = ep.name + dstEp.id = ep.id + dstEp.sandboxID = ep.sandboxID + dstEp.dbIndex = ep.dbIndex + dstEp.dbExists = ep.dbExists + + if ep.iface != nil { + dstEp.iface = &endpointInterface{} + ep.iface.CopyTo(dstEp.iface) + } + + dstEp.exposedPorts = make([]types.TransportPort, len(ep.exposedPorts)) + copy(dstEp.exposedPorts, ep.exposedPorts) + + dstEp.generic = options.Generic{} + for k, v := range ep.generic { + dstEp.generic[k] = v + } + + return nil +} + func (ep *endpoint) ID() string { ep.Lock() defer ep.Unlock() @@ -122,16 +154,28 @@ func (ep *endpoint) Name() string { } func (ep *endpoint) Network() string { - return ep.getNetwork().name + if ep.network == nil { + return "" + } + + return ep.network.name } // endpoint Key structure : endpoint/network-id/endpoint-id func (ep *endpoint) Key() []string { - return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id, ep.id} + if ep.network == nil { + return nil + } + + return []string{datastore.EndpointKeyPrefix, ep.network.id, ep.id} } func (ep *endpoint) KeyPrefix() []string { - return []string{datastore.EndpointKeyPrefix, ep.getNetwork().id} + if ep.network == nil { + return nil + } + + return []string{datastore.EndpointKeyPrefix, ep.network.id} } func (ep *endpoint) networkIDFromKey(key string) (string, error) { @@ -177,7 +221,7 @@ func (ep *endpoint) Exists() bool { } func (ep *endpoint) Skip() bool { - return ep.getNetwork().Skip() + return ep.getNetwork().Skip() || ep.DataScope() == datastore.LocalScope } func (ep *endpoint) processOptions(options ...EndpointOption) { @@ -191,8 +235,22 @@ func (ep *endpoint) processOptions(options ...EndpointOption) { } } -func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error { +func (ep *endpoint) getNetwork() *network { + ep.Lock() + defer ep.Unlock() + return ep.network +} + +func (ep *endpoint) getNetworkFromStore() (*network, error) { + if ep.network == nil { + return nil, fmt.Errorf("invalid network object in endpoint %s", ep.Name()) + } + + return ep.network.ctrlr.getNetworkFromStore(ep.network.id) +} + +func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error { if sbox == nil { return types.BadRequestErrorf("endpoint cannot be joined by nil container") } @@ -215,15 +273,27 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error { return types.BadRequestErrorf("not a valid Sandbox interface") } + network, err := ep.getNetworkFromStore() + if err != nil { + return fmt.Errorf("failed to get network from store during join: %v", err) + } + + ep, err = network.getEndpointFromStore(ep.ID()) + if err != nil { + return fmt.Errorf("failed to get endpoint from store during join: %v", err) + } + ep.Lock() if ep.sandboxID != "" { ep.Unlock() return types.ForbiddenErrorf("a sandbox has already joined the endpoint") } + ep.Unlock() + ep.Lock() + ep.network = network ep.sandboxID = sbox.ID() ep.joinInfo = &endpointJoinInfo{} - network := ep.network epid := ep.id ep.Unlock() defer func() { @@ -235,12 +305,16 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error { }() network.Lock() - driver := network.driver nid := network.id network.Unlock() ep.processOptions(options...) + driver, err := network.driver() + if err != nil { + return fmt.Errorf("failed to join endpoint: %v", err) + } + err = driver.Join(nid, epid, sbox.Key(), ep, sbox.Labels()) if err != nil { return err @@ -262,14 +336,15 @@ func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error { return err } - if err = sb.updateDNS(ep.getNetwork().enableIPv6); err != nil { + // Watch for service records + network.getController().watchSvcRecord(ep) + + if err = sb.updateDNS(network.enableIPv6); err != nil { return err } - if !ep.isLocalScoped() { - if err = network.ctrlr.updateToStore(ep); err != nil { - return err - } + if err = network.getController().updateToStore(ep); err != nil { + return err } sb.Lock() @@ -327,6 +402,16 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { return types.BadRequestErrorf("not a valid Sandbox interface") } + n, err := ep.getNetworkFromStore() + if err != nil { + return fmt.Errorf("failed to get network from store during leave: %v", err) + } + + ep, err = n.getEndpointFromStore(ep.ID()) + if err != nil { + return fmt.Errorf("failed to get endpoint from store during leave: %v", err) + } + ep.Lock() sid := ep.sandboxID ep.Unlock() @@ -342,21 +427,19 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { ep.Lock() ep.sandboxID = "" - n := ep.network + ep.network = n ep.Unlock() - n.Lock() - c := n.ctrlr - d := n.driver - n.Unlock() + if err := n.getController().updateToStore(ep); err != nil { + ep.Lock() + ep.sandboxID = sid + ep.Unlock() + return err + } - if !ep.isLocalScoped() { - if err := c.updateToStore(ep); err != nil { - ep.Lock() - ep.sandboxID = sid - ep.Unlock() - return err - } + d, err := n.driver() + if err != nil { + return fmt.Errorf("failed to leave endpoint: %v", err) } if err := d.Leave(n.id, ep.id); err != nil { @@ -367,6 +450,9 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { return err } + // unwatch for service records + n.getController().unWatchSvcRecord(ep) + if sb.needDefaultGW() { ep := sb.getEPwithoutGateway() if ep == nil { @@ -379,49 +465,48 @@ func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error { func (ep *endpoint) Delete() error { var err error + n, err := ep.getNetworkFromStore() + if err != nil { + return fmt.Errorf("failed to get network during Delete: %v", err) + } + + ep, err = n.getEndpointFromStore(ep.ID()) + if err != nil { + return fmt.Errorf("failed to get endpoint from store during Delete: %v", err) + } + ep.Lock() epid := ep.id name := ep.name - n := ep.network if ep.sandboxID != "" { ep.Unlock() return &ActiveContainerError{name: name, id: epid} } - n.Lock() - ctrlr := n.ctrlr - n.Unlock() ep.Unlock() - if !ep.isLocalScoped() { - if err = ctrlr.deleteFromStore(ep); err != nil { - return err - } - } - defer func() { - if err != nil { - ep.dbExists = false - if !ep.isLocalScoped() { - if e := ctrlr.updateToStore(ep); e != nil { - log.Warnf("failed to recreate endpoint in store %s : %v", name, e) - } - } - } - }() - - // Update the endpoint count in network and update it in the datastore - n.DecEndpointCnt() - if err = ctrlr.updateToStore(n); err != nil { + if err = n.DecEndpointCnt(); err != nil { return err } defer func() { if err != nil { - n.IncEndpointCnt() - if e := ctrlr.updateToStore(n); e != nil { + if e := n.IncEndpointCnt(); e != nil { log.Warnf("failed to update network %s : %v", n.name, e) } } }() + if err = n.getController().deleteFromStore(ep); err != nil { + return err + } + defer func() { + if err != nil { + ep.dbExists = false + if e := n.getController().updateToStore(ep); e != nil { + log.Warnf("failed to recreate endpoint in store %s : %v", name, e) + } + } + }() + if err = ep.deleteEndpoint(); err != nil { return err } @@ -438,38 +523,21 @@ func (ep *endpoint) deleteEndpoint() error { epid := ep.id ep.Unlock() - n.Lock() - _, ok := n.endpoints[epid] - if !ok { - n.Unlock() - return nil + driver, err := n.driver() + if err != nil { + return fmt.Errorf("failed to delete endpoint: %v", err) } - nid := n.id - driver := n.driver - delete(n.endpoints, epid) - n.Unlock() - - if err := driver.DeleteEndpoint(nid, epid); err != nil { + if err := driver.DeleteEndpoint(n.id, epid); err != nil { if _, ok := err.(types.ForbiddenError); ok { - n.Lock() - n.endpoints[epid] = ep - n.Unlock() return err } log.Warnf("driver error deleting endpoint %s : %v", name, err) } - n.updateSvcRecord(ep, false) return nil } -func (ep *endpoint) getNetwork() *network { - ep.Lock() - defer ep.Unlock() - return ep.network -} - func (ep *endpoint) getSandbox() (*sandbox, bool) { ep.Lock() c := ep.network.getController() @@ -545,14 +613,8 @@ func JoinOptionPriority(ep Endpoint, prio int) EndpointOption { } } -func (ep *endpoint) DataScope() datastore.DataScope { - ep.Lock() - defer ep.Unlock() - return ep.network.dataScope -} - -func (ep *endpoint) isLocalScoped() bool { - return ep.DataScope() == datastore.LocalScope +func (ep *endpoint) DataScope() string { + return ep.getNetwork().DataScope() } func (ep *endpoint) assignAddress() error { diff --git a/endpoint_info.go b/endpoint_info.go index 4e25fec..7c765f4 100644 --- a/endpoint_info.go +++ b/endpoint_info.go @@ -2,6 +2,7 @@ package libnetwork import ( "encoding/json" + "fmt" "net" "github.com/docker/libnetwork/driverapi" @@ -115,6 +116,21 @@ func (epi *endpointInterface) UnmarshalJSON(b []byte) error { return nil } +func (epi *endpointInterface) CopyTo(dstEpi *endpointInterface) error { + dstEpi.mac = types.GetMacCopy(epi.mac) + dstEpi.addr = types.GetIPNetCopy(epi.addr) + dstEpi.addrv6 = types.GetIPNetCopy(epi.addrv6) + dstEpi.srcName = epi.srcName + dstEpi.dstPrefix = epi.dstPrefix + dstEpi.poolID = epi.poolID + + for _, route := range epi.routes { + dstEpi.routes = append(dstEpi.routes, types.GetIPNetCopy(route)) + } + + return nil +} + type endpointJoinInfo struct { gw net.IP gw6 net.IP @@ -122,21 +138,38 @@ type endpointJoinInfo struct { } func (ep *endpoint) Info() EndpointInfo { - return ep + n, err := ep.getNetworkFromStore() + if err != nil { + return nil + } + + ep, err = n.getEndpointFromStore(ep.ID()) + if err != nil { + return nil + } + + sb, ok := ep.getSandbox() + if !ok { + // endpoint hasn't joined any sandbox. + // Just return the endpoint + return ep + } + + return sb.getEndpoint(ep.ID()) } func (ep *endpoint) DriverInfo() (map[string]interface{}, error) { - ep.Lock() - network := ep.network - epid := ep.id - ep.Unlock() + n, err := ep.getNetworkFromStore() + if err != nil { + return nil, fmt.Errorf("could not find network in store for driver info: %v", err) + } - network.Lock() - driver := network.driver - nid := network.id - network.Unlock() + driver, err := n.driver() + if err != nil { + return nil, fmt.Errorf("failed to get driver info: %v", err) + } - return driver.EndpointOperInfo(nid, epid) + return driver.EndpointOperInfo(n.ID(), ep.ID()) } func (ep *endpoint) Iface() InterfaceInfo { diff --git a/libnetwork_internal_test.go b/libnetwork_internal_test.go index 2b5e20d..f59b41a 100644 --- a/libnetwork_internal_test.go +++ b/libnetwork_internal_test.go @@ -6,7 +6,6 @@ import ( "net" "testing" - "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/types" @@ -32,11 +31,6 @@ func TestDriverRegistration(t *testing.T) { } } -func SetTestDataStore(c NetworkController, custom datastore.DataStore) { - con := c.(*controller) - con.globalStore = custom -} - func TestNetworkMarshalling(t *testing.T) { n := &network{ name: "Miao", diff --git a/libnetwork_test.go b/libnetwork_test.go index 8743683..db9ec3e 100644 --- a/libnetwork_test.go +++ b/libnetwork_test.go @@ -50,7 +50,7 @@ func TestMain(m *testing.M) { os.Exit(1) } - libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore())) + //libnetwork.SetTestDataStore(controller, datastore.NewCustomDataStore(datastore.NewMockStore())) x := m.Run() controller.Stop() @@ -60,6 +60,9 @@ func TestMain(m *testing.M) { func createController() error { var err error + // Cleanup local datastore file + os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) + option := options.Generic{ "EnableIPForwarding": true, } @@ -358,27 +361,6 @@ func TestNilRemoteDriver(t *testing.T) { } } -func TestDuplicateNetwork(t *testing.T) { - if !testutils.IsRunningInContainer() { - defer testutils.SetupTestOSContext(t)() - } - - // Creating a default bridge name network (can't be removed) - _, err := controller.NewNetwork(bridgeNetType, "testdup") - if err != nil { - t.Fatal(err) - } - - _, err = controller.NewNetwork(bridgeNetType, "testdup") - if err == nil { - t.Fatal("Expected to fail. But instead succeeded") - } - - if _, ok := err.(libnetwork.NetworkNameError); !ok { - t.Fatalf("Did not fail with expected error. Actual error: %v", err) - } -} - func TestNetworkName(t *testing.T) { if !testutils.IsRunningInContainer() { defer testutils.SetupTestOSContext(t)() @@ -703,7 +685,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) { if netWanted == nil { t.Fatal(err) } - if net1 != netWanted { + if net1.ID() != netWanted.ID() { t.Fatal(err) } @@ -712,7 +694,7 @@ func TestNetworkEndpointsWalkers(t *testing.T) { if netWanted == nil { t.Fatal(err) } - if net2 != netWanted { + if net2.ID() != netWanted.ID() { t.Fatal(err) } } @@ -843,7 +825,7 @@ func TestControllerQuery(t *testing.T) { if err != nil { t.Fatalf("Unexpected failure for NetworkByID(): %v", err) } - if net1 != g { + if net1.ID() != g.ID() { t.Fatalf("NetworkByID() returned unexpected element: %v", g) } @@ -863,7 +845,7 @@ func TestControllerQuery(t *testing.T) { if err != nil { t.Fatalf("Unexpected failure for NetworkByID(): %v", err) } - if net2 != g { + if net2.ID() != g.ID() { t.Fatalf("NetworkByID() returned unexpected element: %v", g) } } @@ -940,7 +922,7 @@ func TestNetworkQuery(t *testing.T) { if err != nil { t.Fatal(err) } - if ep12 != e { + if ep12.ID() != e.ID() { t.Fatalf("EndpointByID() returned %v instead of %v", e, ep12) } diff --git a/network.go b/network.go index 94a56c2..f659259 100644 --- a/network.go +++ b/network.go @@ -2,6 +2,7 @@ package libnetwork import ( "encoding/json" + "fmt" "net" "sync" @@ -127,7 +128,6 @@ type network struct { networkType string id string ipamType string - driver driverapi.Driver addrSpace string ipamV4Config []*IpamConf ipamV6Config []*IpamConf @@ -135,14 +135,14 @@ type network struct { ipamV6Info []*IpamInfo enableIPv6 bool endpointCnt uint64 - endpoints endpointTable generic options.Generic dbIndex uint64 svcRecords svcMap dbExists bool persist bool stopWatchCh chan struct{} - dataScope datastore.DataScope + scope string + drvOnce *sync.Once sync.Mutex } @@ -164,11 +164,7 @@ func (n *network) Type() string { n.Lock() defer n.Unlock() - if n.driver == nil { - return "" - } - - return n.driver.Type() + return n.networkType } func (n *network) Key() []string { @@ -220,10 +216,72 @@ func (n *network) Skip() bool { return !n.persist } -func (n *network) DataScope() datastore.DataScope { +func (n *network) New() datastore.KVObject { n.Lock() defer n.Unlock() - return n.dataScope + + return &network{ + ctrlr: n.ctrlr, + drvOnce: &sync.Once{}, + } +} + +// CopyTo deep copies to the destination IpamInfo +func (i *IpamInfo) CopyTo(dstI *IpamInfo) error { + dstI.PoolID = i.PoolID + if i.Meta != nil { + dstI.Meta = make(map[string]string) + for k, v := range i.Meta { + dstI.Meta[k] = v + } + } + + dstI.AddressSpace = i.AddressSpace + dstI.Pool = types.GetIPNetCopy(i.Pool) + dstI.Gateway = types.GetIPNetCopy(i.Gateway) + + if i.AuxAddresses != nil { + dstI.AuxAddresses = make(map[string]*net.IPNet) + for k, v := range i.AuxAddresses { + dstI.AuxAddresses[k] = types.GetIPNetCopy(v) + } + } + + return nil +} + +func (n *network) CopyTo(o datastore.KVObject) error { + n.Lock() + defer n.Unlock() + + dstN := o.(*network) + dstN.name = n.name + dstN.id = n.id + dstN.networkType = n.networkType + dstN.ipamType = n.ipamType + dstN.endpointCnt = n.endpointCnt + dstN.enableIPv6 = n.enableIPv6 + dstN.persist = n.persist + dstN.dbIndex = n.dbIndex + dstN.dbExists = n.dbExists + dstN.drvOnce = n.drvOnce + + for _, v4info := range n.ipamV4Info { + dstV4Info := &IpamInfo{} + v4info.CopyTo(dstV4Info) + dstN.ipamV4Info = append(dstN.ipamV4Info, dstV4Info) + } + + dstN.generic = options.Generic{} + for k, v := range n.generic { + dstN.generic[k] = v + } + + return nil +} + +func (n *network) DataScope() string { + return n.driverScope() } func (n *network) EndpointCnt() uint64 { @@ -232,16 +290,20 @@ func (n *network) EndpointCnt() uint64 { return n.endpointCnt } -func (n *network) IncEndpointCnt() { +func (n *network) IncEndpointCnt() error { n.Lock() n.endpointCnt++ n.Unlock() + + return n.getController().updateToStore(n) } -func (n *network) DecEndpointCnt() { +func (n *network) DecEndpointCnt() error { n.Lock() n.endpointCnt-- n.Unlock() + + return n.getController().updateToStore(n) } // TODO : Can be made much more generic with the help of reflection (but has some golang limitations) @@ -372,17 +434,55 @@ func (n *network) processOptions(options ...NetworkOption) { } } -func (n *network) Delete() error { - var err error +func (n *network) driverScope() string { + c := n.getController() - ctrlr := n.getController() - - ctrlr.Lock() - _, ok := ctrlr.networks[n.id] - ctrlr.Unlock() + c.Lock() + // Check if a driver for the specified network type is available + dd, ok := c.drivers[n.networkType] + c.Unlock() if !ok { - return &UnknownNetworkError{name: n.name, id: n.id} + var err error + dd, err = c.loadDriver(n.networkType) + if err != nil { + // If driver could not be resolved simply return an empty string + return "" + } + } + + return dd.capability.DataScope +} + +func (n *network) driver() (driverapi.Driver, error) { + c := n.getController() + + c.Lock() + // Check if a driver for the specified network type is available + dd, ok := c.drivers[n.networkType] + c.Unlock() + + if !ok { + var err error + dd, err = c.loadDriver(n.networkType) + if err != nil { + return nil, err + } + } + + return dd.driver, nil +} + +func (n *network) Delete() error { + n.Lock() + c := n.ctrlr + name := n.name + id := n.id + n.Unlock() + + n, err := c.getNetworkFromStore(id) + if err != nil { + return &UnknownNetworkError{name: name, id: id} } numEps := n.EndpointCnt() @@ -390,9 +490,22 @@ func (n *network) Delete() error { return &ActiveEndpointsError{name: n.name, id: n.id} } - // deleteNetworkFromStore performs an atomic delete operation and the network.endpointCnt field will help - // prevent any possible race between endpoint join and network delete - if err = ctrlr.deleteFromStore(n); err != nil { + if err = n.deleteNetwork(); err != nil { + return err + } + defer func() { + if err != nil { + if e := c.addNetwork(n); e != nil { + log.Warnf("failed to rollback deleteNetwork for network %s: %v", + n.Name(), err) + } + } + }() + + // deleteFromStore performs an atomic delete operation and the + // network.endpointCnt field will help prevent any possible + // race between endpoint join and network delete + if err = n.getController().deleteFromStore(n); err != nil { if err == datastore.ErrKeyModified { return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.") } @@ -402,65 +515,68 @@ func (n *network) Delete() error { defer func() { if err != nil { n.dbExists = false - if e := ctrlr.updateToStore(n); e != nil { + if e := n.getController().updateToStore(n); e != nil { log.Warnf("failed to recreate network in store %s : %v", n.name, e) } } }() - if err = n.deleteNetwork(); err != nil { - return err - } - n.ipamRelease() return nil } func (n *network) deleteNetwork() error { - n.Lock() - id := n.id - d := n.driver - n.ctrlr.Lock() - delete(n.ctrlr.networks, id) - n.ctrlr.Unlock() - n.Unlock() + d, err := n.driver() + if err != nil { + return fmt.Errorf("failed deleting network: %v", err) + } - if err := d.DeleteNetwork(n.id); err != nil { + // If it is bridge network type make sure we call the driver about the network + // because the network may have been created in some past life of libnetwork. + if n.Type() == "bridge" { + n.drvOnce.Do(func() { + err = n.getController().addNetwork(n) + }) + if err != nil { + return err + } + } + + if err := d.DeleteNetwork(n.ID()); err != nil { // Forbidden Errors should be honored if _, ok := err.(types.ForbiddenError); ok { - n.ctrlr.Lock() - n.ctrlr.networks[n.id] = n - n.ctrlr.Unlock() return err } log.Warnf("driver error deleting network %s : %v", n.name, err) } - n.stopWatch() + return nil } func (n *network) addEndpoint(ep *endpoint) error { - var err error - n.Lock() - n.endpoints[ep.id] = ep - d := n.driver - n.Unlock() + d, err := n.driver() + if err != nil { + return fmt.Errorf("failed to add endpoint: %v", err) + } - defer func() { + // If it is bridge network type make sure we call the driver about the network + // because the network may have been created in some past life of libnetwork. + if n.Type() == "bridge" { + n.drvOnce.Do(func() { + err = n.getController().addNetwork(n) + }) if err != nil { - n.Lock() - delete(n.endpoints, ep.id) - n.Unlock() + return err } - }() + } err = d.CreateEndpoint(n.id, ep.id, ep.Interface(), ep.generic) if err != nil { - return types.InternalErrorf("failed to create endpoint %s on network %s: %v", ep.Name(), n.Name(), err) + return types.InternalErrorf("failed to create endpoint %s on network %s: %v", + ep.Name(), n.Name(), err) } - n.updateSvcRecord(ep, true) return nil } @@ -476,7 +592,16 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}} ep.id = stringid.GenerateRandomID() + + // Initialize ep.network with a possibly stale copy of n. We need this to get network from + // store. But once we get it from store we will have the most uptodate copy possible. ep.network = n + ep.network, err = ep.getNetworkFromStore() + if err != nil { + return nil, fmt.Errorf("failed to get network during CreateEndpoint: %v", err) + } + n = ep.network + ep.processOptions(options...) if err = ep.assignAddress(); err != nil { @@ -488,46 +613,46 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi } }() - ctrlr := n.getController() - - n.IncEndpointCnt() - if err = ctrlr.updateToStore(n); err != nil { - return nil, err - } - defer func() { - if err != nil { - n.DecEndpointCnt() - if err = ctrlr.updateToStore(n); err != nil { - log.Warnf("endpoint count cleanup failed when updating network for %s : %v", name, err) - } - } - }() if err = n.addEndpoint(ep); err != nil { return nil, err } defer func() { if err != nil { - if e := ep.Delete(); ep != nil { + if e := ep.deleteEndpoint(); e != nil { log.Warnf("cleaning up endpoint failed %s : %v", name, e) } } }() - if !ep.isLocalScoped() { - if err = ctrlr.updateToStore(ep); err != nil { - return nil, err + if err = n.getController().updateToStore(ep); err != nil { + return nil, err + } + defer func() { + if err != nil { + if e := n.getController().deleteFromStore(ep); e != nil { + log.Warnf("error rolling back endpoint %s from store: %v", name, e) + } } + }() + + // Increment endpoint count to indicate completion of endpoint addition + if err = n.IncEndpointCnt(); err != nil { + return nil, err } return ep, nil } func (n *network) Endpoints() []Endpoint { - n.Lock() - defer n.Unlock() - list := make([]Endpoint, 0, len(n.endpoints)) - for _, e := range n.endpoints { - list = append(list, e) + var list []Endpoint + + endpoints, err := n.getEndpointsFromStore() + if err != nil { + log.Error(err) + } + + for _, ep := range endpoints { + list = append(list, ep) } return list @@ -568,28 +693,32 @@ func (n *network) EndpointByID(id string) (Endpoint, error) { if id == "" { return nil, ErrInvalidID(id) } - n.Lock() - defer n.Unlock() - if e, ok := n.endpoints[id]; ok { - return e, nil + + ep, err := n.getEndpointFromStore(id) + if err != nil { + return nil, ErrNoSuchEndpoint(id) } - return nil, ErrNoSuchEndpoint(id) + + return ep, nil } -func (n *network) isGlobalScoped() bool { - return n.DataScope() == datastore.GlobalScope -} +func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool) { + c := n.getController() + sr, ok := c.svcDb[n.ID()] + if !ok { + c.svcDb[n.ID()] = svcMap{} + sr = c.svcDb[n.ID()] + } -func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) { n.Lock() var recs []etchosts.Record if iface := ep.Iface(); iface.Address() != nil { if isAdd { - n.svcRecords[ep.Name()] = iface.Address().IP - n.svcRecords[ep.Name()+"."+n.name] = iface.Address().IP + sr[ep.Name()] = iface.Address().IP + sr[ep.Name()+"."+n.name] = iface.Address().IP } else { - delete(n.svcRecords, ep.Name()) - delete(n.svcRecords, ep.Name()+"."+n.name) + delete(sr, ep.Name()) + delete(sr, ep.Name()+"."+n.name) } recs = append(recs, etchosts.Record{ @@ -610,12 +739,11 @@ func (n *network) updateSvcRecord(ep *endpoint, isAdd bool) { } var sbList []*sandbox - n.WalkEndpoints(func(e Endpoint) bool { - if sb, hasSandbox := e.(*endpoint).getSandbox(); hasSandbox { + for _, ep := range localEps { + if sb, hasSandbox := ep.getSandbox(); hasSandbox { sbList = append(sbList, sb) } - return false - }) + } for _, sb := range sbList { if isAdd { @@ -631,7 +759,9 @@ func (n *network) getSvcRecords() []etchosts.Record { defer n.Unlock() var recs []etchosts.Record - for h, ip := range n.svcRecords { + sr, _ := n.ctrlr.svcDb[n.id] + + for h, ip := range sr { recs = append(recs, etchosts.Record{ Hosts: h, IP: ip.String(), @@ -799,7 +929,7 @@ func (n *network) deriveAddressSpace() (string, error) { if !ok { return "", types.NotFoundErrorf("could not find ipam driver %s to get default address space", n.ipamType) } - if n.isGlobalScoped() { + if n.DataScope() == datastore.GlobalScope { return ipd.defaultGlobalAddressSpace, nil } return ipd.defaultLocalAddressSpace, nil diff --git a/sandbox.go b/sandbox.go index 303daa8..1125fe2 100644 --- a/sandbox.go +++ b/sandbox.go @@ -247,6 +247,19 @@ func (sb *sandbox) getConnectedEndpoints() []*endpoint { return eps } +func (sb *sandbox) getEndpoint(id string) *endpoint { + sb.Lock() + defer sb.Unlock() + + for _, ep := range sb.endpoints { + if ep.id == id { + return ep + } + } + + return nil +} + func (sb *sandbox) updateGateway(ep *endpoint) error { sb.Lock() osSbox := sb.osSbox @@ -359,7 +372,13 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error { return nil } -func (sb *sandbox) clearNetworkResources(ep *endpoint) error { +func (sb *sandbox) clearNetworkResources(origEp *endpoint) error { + ep := sb.getEndpoint(origEp.id) + if ep == nil { + return fmt.Errorf("could not find the sandbox endpoint data for endpoint %s", + ep.name) + } + sb.Lock() osSbox := sb.osSbox sb.Unlock() @@ -837,7 +856,7 @@ func (eh epHeap) Less(i, j int) bool { cjp = 0 } if cip == cjp { - return eh[i].getNetwork().Name() < eh[j].getNetwork().Name() + return eh[i].network.Name() < eh[j].network.Name() } return cip > cjp diff --git a/sandbox_test.go b/sandbox_test.go index 0f47248..b17275c 100644 --- a/sandbox_test.go +++ b/sandbox_test.go @@ -115,21 +115,21 @@ func TestSandboxAddMultiPrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep3 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() { t.Fatal("Expected ep3 to be at the top of the heap. But did not find ep3 at the top of the heap") } if err := ep3.Leave(sbx); err != nil { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep2 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() { t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap") } if err := ep2.Leave(sbx); err != nil { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep1 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() { t.Fatal("Expected ep1 to be at the top of the heap after removing ep2. But did not find ep1 at the top of the heap") } @@ -138,7 +138,7 @@ func TestSandboxAddMultiPrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep3 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep3.ID() { t.Fatal("Expected ep3 to be at the top of the heap after adding ep3 back. But did not find ep3 at the top of the heap") } @@ -185,7 +185,7 @@ func TestSandboxAddSamePrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep1 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep1.ID() { t.Fatal("Expected ep1 to be at the top of the heap. But did not find ep1 at the top of the heap") } @@ -193,7 +193,7 @@ func TestSandboxAddSamePrio(t *testing.T) { t.Fatal(err) } - if ctrlr.sandboxes[sid].endpoints[0] != ep2 { + if ctrlr.sandboxes[sid].endpoints[0].ID() != ep2.ID() { t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap") } diff --git a/store.go b/store.go index 9cbbfc5..a45aa3b 100644 --- a/store.go +++ b/store.go @@ -1,408 +1,348 @@ package libnetwork import ( - "encoding/json" "fmt" - "time" log "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" ) -var ( - defaultBoltTimeout = 3 * time.Second - defaultLocalStoreConfig = config.DatastoreCfg{ - Embedded: true, - Client: config.DatastoreClientCfg{ - Provider: "boltdb", - Address: defaultPrefix + "/boltdb.db", - Config: &store.Config{ - Bucket: "libnetwork", - ConnectionTimeout: defaultBoltTimeout, - }, - }, - } -) - -func (c *controller) validateGlobalStoreConfig() bool { - return c.cfg != nil && c.cfg.GlobalStore.Client.Provider != "" && c.cfg.GlobalStore.Client.Address != "" -} - -func (c *controller) initGlobalStore() error { +func (c *controller) initStores() error { c.Lock() - cfg := c.cfg - c.Unlock() - if !c.validateGlobalStoreConfig() { - return fmt.Errorf("globalstore initialization requires a valid configuration") - } - - store, err := datastore.NewDataStore(&cfg.GlobalStore) - if err != nil { - return err - } - c.Lock() - c.globalStore = store - c.Unlock() - return nil -} - -func (c *controller) initLocalStore() error { - c.Lock() - cfg := c.cfg - c.Unlock() - localStore, err := datastore.NewDataStore(c.getLocalStoreConfig(cfg)) - if err != nil { - return err - } - c.Lock() - c.localStore = localStore - c.Unlock() - return nil -} - -func (c *controller) restoreFromGlobalStore() error { - c.Lock() - s := c.globalStore - c.Unlock() - if s == nil { - return nil - } - c.restore("global") - return c.watchNetworks() -} - -func (c *controller) restoreFromLocalStore() error { - c.Lock() - s := c.localStore - c.Unlock() - if s != nil { - c.restore("local") - } - return nil -} - -func (c *controller) restore(store string) { - nws, err := c.getNetworksFromStore(store == "global") - if err == nil { - c.processNetworkUpdate(nws, nil) - } else if err != datastore.ErrKeyNotFound { - log.Warnf("failed to read networks from %s store during init : %v", store, err) - } -} - -func (c *controller) getNetworksFromStore(global bool) ([]*store.KVPair, error) { - var cs datastore.DataStore - c.Lock() - if global { - cs = c.globalStore - } else { - cs = c.localStore - } - c.Unlock() - return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix)) -} - -func (c *controller) newNetworkFromStore(n *network) error { - n.Lock() - n.ctrlr = c - n.endpoints = endpointTable{} - n.Unlock() - - return c.addNetwork(n) -} - -func (c *controller) newEndpointFromStore(key string, ep *endpoint) error { - ep.Lock() - n := ep.network - id := ep.id - ep.Unlock() - - _, err := n.EndpointByID(id) - if err != nil { - if _, ok := err.(ErrNoSuchEndpoint); ok { - return n.addEndpoint(ep) - } - } - return err -} - -func (c *controller) updateToStore(kvObject datastore.KV) error { - if kvObject.Skip() { - return nil - } - cs := c.getDataStore(kvObject.DataScope()) - if cs == nil { - log.Debugf("datastore not initialized. kv object %s is not added to the store", datastore.Key(kvObject.Key()...)) - return nil - } - - return cs.PutObjectAtomic(kvObject) -} - -func (c *controller) deleteFromStore(kvObject datastore.KV) error { - if kvObject.Skip() { - return nil - } - cs := c.getDataStore(kvObject.DataScope()) - if cs == nil { - log.Debugf("datastore not initialized. kv object %s is not deleted from datastore", datastore.Key(kvObject.Key()...)) - return nil - } - - if err := cs.DeleteObjectAtomic(kvObject); err != nil { - return err - } - - return nil -} - -func (c *controller) watchNetworks() error { - if !c.validateGlobalStoreConfig() { - return nil - } - - c.Lock() - cs := c.globalStore - c.Unlock() - - networkKey := datastore.Key(datastore.NetworkKeyPrefix) - if err := ensureKeys(networkKey, cs); err != nil { - return fmt.Errorf("failed to ensure if the network keys are valid and present in store: %v", err) - } - nwPairs, err := cs.KVStore().WatchTree(networkKey, nil) - if err != nil { - return err - } - go func() { - for { - select { - case nws := <-nwPairs: - c.Lock() - tmpview := networkTable{} - lview := c.networks - c.Unlock() - for k, v := range lview { - if v.isGlobalScoped() { - tmpview[k] = v - } - } - c.processNetworkUpdate(nws, &tmpview) - - // Delete processing - for k := range tmpview { - c.Lock() - existing, ok := c.networks[k] - c.Unlock() - if !ok { - continue - } - tmp := network{} - if err := c.globalStore.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { - continue - } - if err := existing.deleteNetwork(); err != nil { - log.Debugf("Delete failed %s: %s", existing.name, err) - } - } - } - } - }() - return nil -} - -func (n *network) watchEndpoints() error { - if n.Skip() || !n.ctrlr.validateGlobalStoreConfig() { - return nil - } - - n.Lock() - cs := n.ctrlr.globalStore - tmp := endpoint{network: n} - n.stopWatchCh = make(chan struct{}) - stopCh := n.stopWatchCh - n.Unlock() - - endpointKey := datastore.Key(tmp.KeyPrefix()...) - if err := ensureKeys(endpointKey, cs); err != nil { - return fmt.Errorf("failed to ensure if the endpoint keys are valid and present in store: %v", err) - } - epPairs, err := cs.KVStore().WatchTree(endpointKey, stopCh) - if err != nil { - return err - } - go func() { - for { - select { - case <-stopCh: - return - case eps := <-epPairs: - n.Lock() - tmpview := endpointTable{} - lview := n.endpoints - n.Unlock() - for k, v := range lview { - if v.network.isGlobalScoped() { - tmpview[k] = v - } - } - n.ctrlr.processEndpointsUpdate(eps, &tmpview) - // Delete processing - for k := range tmpview { - n.Lock() - existing, ok := n.endpoints[k] - n.Unlock() - if !ok { - continue - } - tmp := endpoint{} - if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound { - continue - } - if err := existing.deleteEndpoint(); err != nil { - log.Debugf("Delete failed %s: %s", existing.name, err) - } - } - } - } - }() - return nil -} - -func (n *network) stopWatch() { - n.Lock() - if n.stopWatchCh != nil { - close(n.stopWatchCh) - n.stopWatchCh = nil - } - n.Unlock() -} - -func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) { - for _, kve := range nws { - var n network - err := json.Unmarshal(kve.Value, &n) - if err != nil { - log.Error(err) - continue - } - if prune != nil { - delete(*prune, n.id) - } - n.SetIndex(kve.LastIndex) - c.Lock() - existing, ok := c.networks[n.id] + if c.cfg == nil { c.Unlock() - if ok { - existing.Lock() - // Skip existing network update - if existing.dbIndex != n.Index() { - // Can't use SetIndex() since existing is locked. - existing.dbIndex = n.Index() - existing.dbExists = true - existing.endpointCnt = n.endpointCnt - } - existing.Unlock() - continue - } - - if err = c.newNetworkFromStore(&n); err != nil { - log.Error(err) - } - } -} - -func (c *controller) processEndpointUpdate(ep *endpoint) bool { - nw := ep.network - if nw == nil { - return true - } - nw.Lock() - id := nw.id - nw.Unlock() - - c.Lock() - n, ok := c.networks[id] - c.Unlock() - if !ok { - return true - } - existing, _ := n.EndpointByID(ep.id) - if existing == nil { - return true - } - - ee := existing.(*endpoint) - ee.Lock() - if ee.dbIndex != ep.Index() { - // Can't use SetIndex() because ee is locked. - ee.dbIndex = ep.Index() - ee.dbExists = true - ee.sandboxID = ep.sandboxID - } - ee.Unlock() - - return false -} - -func ensureKeys(key string, cs datastore.DataStore) error { - exists, err := cs.KVStore().Exists(key) - if err != nil { - return err - } - if exists { return nil } - return cs.KVStore().Put(key, []byte{}, nil) -} + scopeConfigs := c.cfg.Scopes + c.Unlock() -func (c *controller) getLocalStoreConfig(cfg *config.Config) *config.DatastoreCfg { - if cfg != nil && cfg.LocalStore.Client.Provider != "" && cfg.LocalStore.Client.Address != "" { - return &cfg.LocalStore + for scope, scfg := range scopeConfigs { + store, err := datastore.NewDataStore(scope, scfg) + if err != nil { + return err + } + c.Lock() + c.stores = append(c.stores, store) + c.Unlock() } - return &defaultLocalStoreConfig + + c.startWatch() + return nil } -func (c *controller) getDataStore(dataScope datastore.DataScope) (dataStore datastore.DataStore) { +func (c *controller) closeStores() { + for _, store := range c.getStores() { + store.Close() + } +} + +func (c *controller) getStore(scope string) datastore.DataStore { c.Lock() - if dataScope == datastore.GlobalScope { - dataStore = c.globalStore - } else if dataScope == datastore.LocalScope { - dataStore = c.localStore + defer c.Unlock() + + for _, store := range c.stores { + if store.Scope() == scope { + return store + } + } + + return nil +} + +func (c *controller) getStores() []datastore.DataStore { + c.Lock() + defer c.Unlock() + + return c.stores +} + +func (c *controller) getNetworkFromStore(nid string) (*network, error) { + for _, store := range c.getStores() { + n := &network{id: nid, ctrlr: c} + err := store.GetObject(datastore.Key(n.Key()...), n) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, fmt.Errorf("could not find network %s: %v", nid, err) + } + + // Continue searching in the next store if the key is not found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + return n, nil + } + + return nil, fmt.Errorf("network %s not found", nid) +} + +func (c *controller) getNetworksFromStore() ([]*network, error) { + var nl []*network + + for _, store := range c.getStores() { + kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix), + &network{ctrlr: c}) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, fmt.Errorf("failed to get networks for scope %s: %v", + store.Scope(), err) + } + + // Continue searching in the next store if no keys found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + for _, kvo := range kvol { + n := kvo.(*network) + n.ctrlr = c + nl = append(nl, n) + } + } + + return nl, nil +} + +func (n *network) getEndpointFromStore(eid string) (*endpoint, error) { + for _, store := range n.ctrlr.getStores() { + ep := &endpoint{id: eid, network: n} + err := store.GetObject(datastore.Key(ep.Key()...), ep) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, fmt.Errorf("could not find endpoint %s: %v", eid, err) + } + + // Continue searching in the next store if the key is not found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + return ep, nil + } + + return nil, fmt.Errorf("endpoint %s not found", eid) +} + +func (n *network) getEndpointsFromStore() ([]*endpoint, error) { + var epl []*endpoint + + tmp := endpoint{network: n} + for _, store := range n.getController().getStores() { + kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n}) + if err != nil && err != datastore.ErrKeyNotFound { + return nil, + fmt.Errorf("failed to get endpoints for network %s scope %s: %v", + n.Name(), store.Scope(), err) + } + + // Continue searching in the next store if no keys found in this store + if err == datastore.ErrKeyNotFound { + continue + } + + for _, kvo := range kvol { + ep := kvo.(*endpoint) + ep.network = n + epl = append(epl, ep) + } + } + + return epl, nil +} + +func (c *controller) updateToStore(kvObject datastore.KVObject) error { + cs := c.getStore(kvObject.DataScope()) + if cs == nil { + log.Warnf("datastore for scope %s not initialized. kv object %s is not added to the store", kvObject.DataScope(), datastore.Key(kvObject.Key()...)) + return nil + } + + if err := cs.PutObjectAtomic(kvObject); err != nil { + return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err) + } + + return nil +} + +func (c *controller) deleteFromStore(kvObject datastore.KVObject) error { + cs := c.getStore(kvObject.DataScope()) + if cs == nil { + log.Debugf("datastore for scope %s not initialized. kv object %s is not deleted from datastore", kvObject.DataScope(), datastore.Key(kvObject.Key()...)) + return nil + } + +retry: + if err := cs.DeleteObjectAtomic(kvObject); err != nil { + if err == datastore.ErrKeyModified { + if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil { + return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err) + } + goto retry + } + return err + } + + return nil +} + +type netWatch struct { + localEps map[string]*endpoint + remoteEps map[string]*endpoint + stopCh chan struct{} +} + +func (c *controller) getLocalEps(nw *netWatch) []*endpoint { + c.Lock() + defer c.Unlock() + + var epl []*endpoint + for _, ep := range nw.localEps { + epl = append(epl, ep) + } + + return epl +} + +func (c *controller) watchSvcRecord(ep *endpoint) { + c.watchCh <- ep +} + +func (c *controller) unWatchSvcRecord(ep *endpoint) { + c.unWatchCh <- ep +} + +func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) { + for { + select { + case <-nw.stopCh: + return + case o := <-nCh: + n := o.(*network) + + epl, err := n.getEndpointsFromStore() + if err != nil { + break + } + + c.Lock() + var addEp []*endpoint + + delEpMap := make(map[string]*endpoint) + for k, v := range nw.remoteEps { + delEpMap[k] = v + } + + for _, lEp := range epl { + if _, ok := nw.localEps[lEp.ID()]; ok { + continue + } + + if _, ok := nw.remoteEps[lEp.ID()]; ok { + delete(delEpMap, lEp.ID()) + continue + } + + nw.remoteEps[lEp.ID()] = lEp + addEp = append(addEp, lEp) + + } + c.Unlock() + + for _, lEp := range addEp { + ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true) + } + + for _, lEp := range delEpMap { + ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false) + + } + } + } +} + +func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) { + c.Lock() + nw, ok := nmap[ep.getNetwork().ID()] + c.Unlock() + + if ok { + // Update the svc db for the local endpoint join right away + ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true) + + c.Lock() + nw.localEps[ep.ID()] = ep + c.Unlock() + return + } + + nw = &netWatch{ + localEps: make(map[string]*endpoint), + remoteEps: make(map[string]*endpoint), + } + + // Update the svc db for the local endpoint join right away + // Do this before adding this ep to localEps so that we don't + // try to update this ep's container's svc records + ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true) + + c.Lock() + nw.localEps[ep.ID()] = ep + nmap[ep.getNetwork().ID()] = nw + nw.stopCh = make(chan struct{}) + c.Unlock() + + store := c.getStore(ep.getNetwork().DataScope()) + if store == nil { + return + } + + if !store.Watchable() { + return + } + + ch, err := store.Watch(ep.getNetwork(), nw.stopCh) + if err != nil { + log.Warnf("Error creating watch for network: %v", err) + return + } + + go c.networkWatchLoop(nw, ep, ch) +} + +func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) { + c.Lock() + nw, ok := nmap[ep.getNetwork().ID()] + + if ok { + delete(nw.localEps, ep.ID()) + c.Unlock() + + // Update the svc db about local endpoint leave right away + // Do this after we remove this ep from localEps so that we + // don't try to remove this svc record from this ep's container. + ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), false) + + c.Lock() + if len(nw.localEps) == 0 { + close(nw.stopCh) + delete(nmap, ep.getNetwork().ID()) + } } c.Unlock() - return } -func (c *controller) processEndpointsUpdate(eps []*store.KVPair, prune *endpointTable) { - for _, epe := range eps { - var ep endpoint - err := json.Unmarshal(epe.Value, &ep) - if err != nil { - log.Error(err) - continue - } - if prune != nil { - delete(*prune, ep.id) - } - ep.SetIndex(epe.LastIndex) - if nid, err := ep.networkIDFromKey(epe.Key); err != nil { - log.Error(err) - continue - } else { - if n, err := c.NetworkByID(nid); err != nil { - log.Error(err) - continue - } else { - ep.network = n.(*network) - } - } - if c.processEndpointUpdate(&ep) { - err = c.newEndpointFromStore(epe.Key, &ep) - if err != nil { - log.Error(err) - } +func (c *controller) watchLoop(nmap map[string]*netWatch) { + for { + select { + case ep := <-c.watchCh: + c.processEndpointCreate(nmap, ep) + case ep := <-c.unWatchCh: + c.processEndpointDelete(nmap, ep) } } } + +func (c *controller) startWatch() { + c.watchCh = make(chan *endpoint) + c.unWatchCh = make(chan *endpoint) + nmap := make(map[string]*netWatch) + + go c.watchLoop(nmap) +} diff --git a/store_test.go b/store_test.go index 63777c0..f5e0871 100644 --- a/store_test.go +++ b/store_test.go @@ -33,7 +33,7 @@ func testNewController(t *testing.T, provider, url string) (NetworkController, e } func TestBoltdbBackend(t *testing.T) { - defer os.Remove(defaultLocalStoreConfig.Client.Address) + defer os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) testLocalBackend(t, "", "", nil) defer os.Remove("/tmp/boltdb.db") config := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second} @@ -64,7 +64,7 @@ func testLocalBackend(t *testing.T, provider, url string, storeConfig *store.Con if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.(*controller).localStore.KVStore() + store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore() if exists, err := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); !exists || err != nil { t.Fatalf("Network key should have been created.") } @@ -100,7 +100,7 @@ func TestNoPersist(t *testing.T) { if err != nil { t.Fatalf("Error creating endpoint: %v", err) } - store := ctrl.(*controller).localStore.KVStore() + store := ctrl.(*controller).getStore(datastore.LocalScope).KVStore() if exists, _ := store.Exists(datastore.Key(datastore.NetworkKeyPrefix, string(nw.ID()))); exists { t.Fatalf("Network with persist=false should not be stored in KV Store") } @@ -138,12 +138,8 @@ func TestLocalStoreLockTimeout(t *testing.T) { } defer ctrl1.Stop() // Use the same boltdb file without closing the previous controller - ctrl2, err := New(cfgOptions...) - if err != nil { - t.Fatalf("Error new controller: %v", err) - } - store := ctrl2.(*controller).localStore - if store != nil { - t.Fatalf("localstore is expected to be nil") + _, err = New(cfgOptions...) + if err == nil { + t.Fatalf("Expected to fail but succeeded") } }