diff --git a/client.go b/client.go index 324740cf..29f31a7d 100644 --- a/client.go +++ b/client.go @@ -72,11 +72,13 @@ type ClientOptions struct { // ClientOptionsFromConf attempts to load any relevant configuration options // from the given Hadoop configuration and create a ClientOptions struct -// suitable for creating a Client. Currently this sets the following fields -// on the resulting ClientOptions: +// suitable for creating a Client for the given nameservice. Currently this +// sets the following fields on the resulting ClientOptions: // -// // Determined by fs.defaultFS (or the deprecated fs.default.name), or -// // fields beginning with dfs.namenode.rpc-address. +// // Determined by the value of the property named +// // "dfs.namenode.rpc-address."+ns if using a non-HA federated +// // architecture, or "dfs.namenode.rpc-address."+ns+"."+nnid for each nnid +// // found in "dfs.ha.namenodes."+ns // Addresses []string // // // Determined by dfs.client.use.datanode.hostname. @@ -95,14 +97,30 @@ type ClientOptions struct { // actually configured, you should check for whether KerberosClient is set in // the resulting ClientOptions before proceeding: // -// options := ClientOptionsFromConf(conf) +// options := ClientOptionsFromConf(conf, "mynameservice") // if options.KerberosClient != nil { // // Replace with a valid credentialed client. // options.KerberosClient = getKerberosClient() // } -func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions { - options := ClientOptions{Addresses: conf.Namenodes()} +func ClientOptionsFromConf(conf hadoopconf.HadoopConf, ns string) ClientOptions { + options := commonClientOptionsFromConf(conf) + options.Addresses = conf.Namenodes(ns) + return options +} + +// DefaultClientOptionsFromConf behaves similarly to ClientOptionsFromConf +// except it uses the nameservice defined in fs.defaultFS (or the deprecated +// fs.default.name) for finding the namenode addresses. If both of these +// properties are absent, then it uses the address defined by +// dfs.namenode.rpc-address. +func DefaultClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions { + options := commonClientOptionsFromConf(conf) + options.Addresses = conf.DefaultNamenodes() + return options +} +func commonClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions { + options := ClientOptions{} options.UseDatanodeHostname = (conf["dfs.client.use.datanode.hostname"] == "true") if strings.ToLower(conf["hadoop.security.authentication"]) == "kerberos" { @@ -156,7 +174,7 @@ func NewClient(options ClientOptions) (*Client, error) { // (including the address(es) of the namenode(s), if an empty string is passed) // will be loaded from the Hadoop configuration present at HADOOP_CONF_DIR or // HADOOP_HOME, as specified by hadoopconf.LoadFromEnvironment and -// ClientOptionsFromConf. +// DefaultClientOptionsFromConf. // // Note, however, that New will not attempt any Kerberos authentication; use // NewClient if you need that. @@ -166,7 +184,7 @@ func New(address string) (*Client, error) { return nil, err } - options := ClientOptionsFromConf(conf) + options := DefaultClientOptionsFromConf(conf) if address != "" { options.Addresses = strings.Split(address, ",") } diff --git a/client_test.go b/client_test.go index 3966f0f4..32707021 100644 --- a/client_test.go +++ b/client_test.go @@ -41,7 +41,7 @@ func getClientForUser(t *testing.T, username string) *Client { t.Fatal("Couldn't load ambient config", err) } - options := ClientOptionsFromConf(conf) + options := DefaultClientOptionsFromConf(conf) if options.Addresses == nil { t.Fatal("Missing namenode addresses in ambient config") } @@ -149,7 +149,7 @@ func TestNewWithMultipleNodes(t *testing.T) { t.Fatal("Couldn't load ambient config", err) } - nns := conf.Namenodes() + nns := conf.DefaultNamenodes() nns = append([]string{"localhost:100"}, nns...) _, err = NewClient(ClientOptions{Addresses: nns, User: "gohdfs1"}) diff --git a/cmd/hdfs/main.go b/cmd/hdfs/main.go index dc7f0b47..bc5013b5 100755 --- a/cmd/hdfs/main.go +++ b/cmd/hdfs/main.go @@ -187,7 +187,7 @@ func getClient(namenode string) (*hdfs.Client, error) { return nil, fmt.Errorf("Problem loading configuration: %s", err) } - options := hdfs.ClientOptionsFromConf(conf) + options := hdfs.DefaultClientOptionsFromConf(conf) if namenode != "" { options.Addresses = []string{namenode} } diff --git a/hadoopconf/hadoopconf.go b/hadoopconf/hadoopconf.go index f9a6fc2b..8123a919 100644 --- a/hadoopconf/hadoopconf.go +++ b/hadoopconf/hadoopconf.go @@ -90,43 +90,72 @@ func Load(path string) (HadoopConf, error) { return conf, nil } -// Namenodes returns the namenode hosts present in the configuration. The -// returned slice will be sorted and deduped. The values are loaded from -// fs.defaultFS (or the deprecated fs.default.name), or fields beginning with -// dfs.namenode.rpc-address. -// -// To handle 'logical' clusters Namenodes will not return any cluster names -// found in dfs.ha.namenodes. properties. -// -// If no namenode addresses can befound, Namenodes returns a nil slice. -func (conf HadoopConf) Namenodes() []string { - nns := make(map[string]bool) - var clusterNames []string - - for key, value := range conf { - if strings.Contains(key, "fs.default") { - nnUrl, _ := url.Parse(value) - nns[nnUrl.Host] = true - } else if strings.HasPrefix(key, "dfs.namenode.rpc-address.") { - nns[value] = true - } else if strings.HasPrefix(key, "dfs.ha.namenodes.") { - clusterNames = append(clusterNames, key[len("dfs.ha.namenodes."):]) +// DefaultNamenodes returns the namenodes that should be used given the +// configuration's fs.defaultFS (or deprecated fs.default.name) property. If no +// such property is found, i.e. if the configuration is using neither federated +// namespaces nor high availability, then the dfs.namenode.rpc-address property +// is returned if present. Otherwise, a nil slice is returned. +func (conf HadoopConf) DefaultNamenodes() []string { + if fs, ok := conf["fs.defaultFS"]; ok { + // check if default nameservice is defined + fsurl, _ := url.Parse(fs) + if fsurl == nil { + return nil } + return conf.Namenodes(fsurl.Host) + } else if ns, ok := conf["fs.default.name"]; ok { + // check if default nameservice is defined (through deprecated name) + return conf.Namenodes(ns) + } else if nn, ok := conf["dfs.namenode.rpc-address"]; ok { + // non-HA and non-federated config; return single namenode + return []string{nn} + } else { + // no namenodes found at all + return nil } +} - for _, cn := range clusterNames { - delete(nns, cn) - } +// namenodesPerNS returns a mapping from clusters to the namenode(s) in those +// clusters. +func (conf HadoopConf) namenodesPerNS() map[string][]string { + nns := make(map[string][]string) + var clusterNames []string - if len(nns) == 0 { - return nil + // this property is required for high availability and/or federation. if + // it's not set, the configuration must be using a non-federated and non-HA + // architecture. check if the property is defined before updating + // clusterNames because strings.Split will return a non-empty slice given + // an empty string, covering up the distinction between no dfs.nameservices + // given and an empty dfs.nameservices. + if nameservices, ok := conf["dfs.nameservices"]; ok { + clusterNames = append(clusterNames, strings.Split(nameservices, ",")...) } - keys := make([]string, 0, len(nns)) - for k, _ := range nns { - keys = append(keys, k) + // obtain logical namenode ids per nameservice + for _, ns := range clusterNames { + nnids, ha := conf["dfs.ha.namenodes."+ns] + if !ha { + // non-HA federated architecture + if nn, ok := conf["dfs.namenode.rpc-address."+ns]; ok { + nns[ns] = append(nns[ns], nn) + } + } else { + // HA architecture + for _, nnid := range strings.Split(nnids, ",") { + if nn, ok := conf["dfs.namenode.rpc-address."+ns+"."+nnid]; ok { + nns[ns] = append(nns[ns], nn) + } + } + sort.Strings(nns[ns]) + } } - sort.Strings(keys) - return keys + return nns +} + +// Namenodes returns the namenode hosts present in the configuration for the +// given nameservice. The returned slice will be sorted. If no namenode +// addresses can be found, Namenodes returns a nil slice. +func (conf HadoopConf) Namenodes(ns string) []string { + return conf.namenodesPerNS()[ns] } diff --git a/hadoopconf/hadoopconf_test.go b/hadoopconf/hadoopconf_test.go index abf81356..972b08aa 100644 --- a/hadoopconf/hadoopconf_test.go +++ b/hadoopconf/hadoopconf_test.go @@ -19,7 +19,7 @@ func TestConfFallback(t *testing.T) { conf, err := LoadFromEnvironment() assert.NoError(t, err) - nns := conf.Namenodes() + nns := conf.DefaultNamenodes() assert.NoError(t, err) assert.EqualValues(t, conf2Namenodes, nns, "loading via HADOOP_CONF_DIR (testdata/conf2)") @@ -28,7 +28,7 @@ func TestConfFallback(t *testing.T) { conf, err = LoadFromEnvironment() assert.NoError(t, err) - nns = conf.Namenodes() + nns = conf.DefaultNamenodes() assert.NoError(t, err) assert.EqualValues(t, confNamenodes, nns, "loading via HADOOP_HOME (testdata/conf)")