From f4da12194b1287bfc6c839f149bbf90310f06988 Mon Sep 17 00:00:00 2001 From: sloblee Date: Wed, 11 Jan 2017 11:55:09 +0800 Subject: [PATCH 1/5] change the default etcd address to "http://127.0.0.1:2379" --- internal/flagconfig/flagconfig.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/flagconfig/flagconfig.go b/internal/flagconfig/flagconfig.go index 180e8c5..f10c82d 100644 --- a/internal/flagconfig/flagconfig.go +++ b/internal/flagconfig/flagconfig.go @@ -21,7 +21,7 @@ import ( const ( defaultTorusConfig = "/.torus/config.json" - defaultEtcdAddress = "127.0.0.1:2379" + defaultEtcdAddress = "http://127.0.0.1:2379" ) var ( From 39294dd7ffd20ebc6ed6d37ac545f7762bc7fa6a Mon Sep 17 00:00:00 2001 From: Edward Date: Thu, 27 Apr 2017 19:57:45 +0800 Subject: [PATCH 2/5] remove the list-peers command that can be replaced by peers list command --- cmd/torusctl/peer.go | 85 ++++++++++++++++++++++++++++++++++++++-- cmd/torusctl/torusctl.go | 5 ++- 2 files changed, 86 insertions(+), 4 deletions(-) mode change 100644 => 100755 cmd/torusctl/peer.go mode change 100644 => 100755 cmd/torusctl/torusctl.go diff --git a/cmd/torusctl/peer.go b/cmd/torusctl/peer.go old mode 100644 new mode 100755 index 652a5e1..e4afc0c --- a/cmd/torusctl/peer.go +++ b/cmd/torusctl/peer.go @@ -1,17 +1,22 @@ package main import ( + "fmt" "os" + "time" "github.com/coreos/torus" "github.com/coreos/torus/models" + "github.com/dustin/go-humanize" "github.com/spf13/cobra" ) var ( - newPeers torus.PeerInfoList - allPeers bool - force bool + newPeers torus.PeerInfoList + allPeers bool + force bool + outputAsCSV bool + outputAsSI bool ) var peerCommand = &cobra.Command{ @@ -44,6 +49,8 @@ func init() { peerCommand.AddCommand(peerAddCommand, peerRemoveCommand, peerListCommand) peerAddCommand.Flags().BoolVar(&allPeers, "all-peers", false, "add all peers") peerRemoveCommand.PersistentFlags().BoolVar(&force, "force", false, "force-remove a UUID") + peerListCommand.Flags().BoolVarP(&outputAsCSV, "csv", "", false, "output as csv instead") + peerListCommand.Flags().BoolVarP(&outputAsSI, "si", "", false, "output sizes in powers of 1000") } func peerAction(cmd *cobra.Command, args []string) { @@ -141,3 +148,75 @@ func peerRemoveAction(cmd *cobra.Command, args []string) { die("couldn't set new ring: %v", err) } } + +func listPeersAction(cmd *cobra.Command, args []string) { + var totalStorage uint64 + var usedStorage uint64 + + mds := mustConnectToMDS() + gmd := mds.GlobalMetadata() + peers, err := mds.GetPeers() + if err != nil { + die("couldn't get peers: %v", err) + } + ring, err := mds.GetRing() + if err != nil { + die("couldn't get ring: %v", err) + } + members := ring.Members() + table := NewTableWriter(os.Stdout) + table.SetHeader([]string{"Address", "UUID", "Size", "Used", "Member", "Updated", "Reb/Rep Data"}) + rebalancing := false + for _, x := range peers { + ringStatus := "Avail" + if x.Address == "" { + continue + } + if members.Has(x.UUID) { + ringStatus = "OK" + } + table.Append([]string{ + x.Address, + x.UUID, + bytesOrIbytes(x.TotalBlocks*gmd.BlockSize, outputAsSI), + bytesOrIbytes(x.UsedBlocks*gmd.BlockSize, outputAsSI), + ringStatus, + humanize.Time(time.Unix(0, x.LastSeen)), + bytesOrIbytes(x.RebalanceInfo.LastRebalanceBlocks*gmd.BlockSize*uint64(time.Second)/uint64(x.LastSeen+1-x.RebalanceInfo.LastRebalanceFinish), outputAsSI) + "/sec", + }) + if x.RebalanceInfo.Rebalancing { + rebalancing = true + } + totalStorage += x.TotalBlocks * gmd.BlockSize + usedStorage += x.UsedBlocks * gmd.BlockSize + } + + for _, x := range members { + ringStatus := "DOWN" + ok := false + for _, p := range peers { + if p.UUID == x { + ok = true + break + } + } + if ok { + continue + } + table.Append([]string{ + "", + x, + "???", + "???", + ringStatus, + "Missing", + "", + }) + } + if outputAsCSV { + table.RenderCSV() + } else { + table.Render() + fmt.Printf("Balanced: %v Usage: %5.2f%%\n", !rebalancing, (float64(usedStorage) / float64(totalStorage) * 100.0)) + } +} diff --git a/cmd/torusctl/torusctl.go b/cmd/torusctl/torusctl.go old mode 100644 new mode 100755 index 83b6c6d..acb037a --- a/cmd/torusctl/torusctl.go +++ b/cmd/torusctl/torusctl.go @@ -35,10 +35,11 @@ var versionCommand = &cobra.Command{ } func init() { + fmt.Println("start to init MDS...") rootCommand.PersistentFlags().BoolVarP(&debug, "debug", "", false, "enable debug logging") rootCommand.AddCommand(initCommand) rootCommand.AddCommand(blockCommand) - rootCommand.AddCommand(listPeersCommand) + // rootCommand.AddCommand(listPeersCommand) rootCommand.AddCommand(ringCommand) rootCommand.AddCommand(peerCommand) rootCommand.AddCommand(volumeCommand) @@ -47,6 +48,8 @@ func init() { rootCommand.AddCommand(configCommand) rootCommand.AddCommand(completionCommand) flagconfig.AddConfigFlags(rootCommand.PersistentFlags()) + + fmt.Println("MDS init finished...") } func main() { From 1d8891f601794d1fa122f01fa11e52f152dd6691 Mon Sep 17 00:00:00 2001 From: Edward Date: Fri, 28 Apr 2017 14:36:25 +0800 Subject: [PATCH 3/5] remove the list-peers command instend of peers list --- cmd/torusctl/list-peers.go | 98 -------------------------------------- 1 file changed, 98 deletions(-) delete mode 100644 cmd/torusctl/list-peers.go diff --git a/cmd/torusctl/list-peers.go b/cmd/torusctl/list-peers.go deleted file mode 100644 index 6f3fefe..0000000 --- a/cmd/torusctl/list-peers.go +++ /dev/null @@ -1,98 +0,0 @@ -package main - -import ( - "fmt" - "os" - "time" - - "github.com/dustin/go-humanize" - "github.com/spf13/cobra" -) - -var ( - outputAsCSV bool - outputAsSI bool -) - -var listPeersCommand = &cobra.Command{ - Use: "list-peers", - Short: "show the active storage peers in the cluster", - Run: listPeersAction, -} - -func init() { - listPeersCommand.Flags().BoolVarP(&outputAsCSV, "csv", "", false, "output as csv instead") - listPeersCommand.Flags().BoolVarP(&outputAsSI, "si", "", false, "output sizes in powers of 1000") -} - -func listPeersAction(cmd *cobra.Command, args []string) { - var totalStorage uint64 - var usedStorage uint64 - - mds := mustConnectToMDS() - gmd := mds.GlobalMetadata() - peers, err := mds.GetPeers() - if err != nil { - die("couldn't get peers: %v", err) - } - ring, err := mds.GetRing() - if err != nil { - die("couldn't get ring: %v", err) - } - members := ring.Members() - table := NewTableWriter(os.Stdout) - table.SetHeader([]string{"Address", "UUID", "Size", "Used", "Member", "Updated", "Reb/Rep Data"}) - rebalancing := false - for _, x := range peers { - ringStatus := "Avail" - if x.Address == "" { - continue - } - if members.Has(x.UUID) { - ringStatus = "OK" - } - table.Append([]string{ - x.Address, - x.UUID, - bytesOrIbytes(x.TotalBlocks*gmd.BlockSize, outputAsSI), - bytesOrIbytes(x.UsedBlocks*gmd.BlockSize, outputAsSI), - ringStatus, - humanize.Time(time.Unix(0, x.LastSeen)), - bytesOrIbytes(x.RebalanceInfo.LastRebalanceBlocks*gmd.BlockSize*uint64(time.Second)/uint64(x.LastSeen+1-x.RebalanceInfo.LastRebalanceFinish), outputAsSI) + "/sec", - }) - if x.RebalanceInfo.Rebalancing { - rebalancing = true - } - totalStorage += x.TotalBlocks * gmd.BlockSize - usedStorage += x.UsedBlocks * gmd.BlockSize - } - - for _, x := range members { - ringStatus := "DOWN" - ok := false - for _, p := range peers { - if p.UUID == x { - ok = true - break - } - } - if ok { - continue - } - table.Append([]string{ - "", - x, - "???", - "???", - ringStatus, - "Missing", - "", - }) - } - if outputAsCSV { - table.RenderCSV() - } else { - table.Render() - fmt.Printf("Balanced: %v Usage: %5.2f%%\n", !rebalancing, (float64(usedStorage) / float64(totalStorage) * 100.0)) - } -} From 352596268511833430731bd9fdd10666aadc4918 Mon Sep 17 00:00:00 2001 From: Edward Date: Fri, 28 Apr 2017 14:39:44 +0800 Subject: [PATCH 4/5] change the default log level to trace. this change is only for my debug. --- cmd/torusblk/main.go | 4 ++-- cmd/torusctl/init.go | 2 ++ cmd/torusd/main.go | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) mode change 100644 => 100755 cmd/torusblk/main.go mode change 100644 => 100755 cmd/torusctl/init.go mode change 100644 => 100755 cmd/torusd/main.go diff --git a/cmd/torusblk/main.go b/cmd/torusblk/main.go old mode 100644 new mode 100755 index 9029e9c..4f4472d --- a/cmd/torusblk/main.go +++ b/cmd/torusblk/main.go @@ -69,10 +69,10 @@ func configureServer(cmd *cobra.Command, args []string) { case debug: capnslog.SetGlobalLogLevel(capnslog.DEBUG) default: - capnslog.SetGlobalLogLevel(capnslog.INFO) + capnslog.SetGlobalLogLevel(capnslog.TRACE) } if logpkg != "" { - capnslog.SetGlobalLogLevel(capnslog.NOTICE) + capnslog.SetGlobalLogLevel(capnslog.TRACE) rl := capnslog.MustRepoLogger("github.com/coreos/torus") llc, err := rl.ParseLogLevelConfig(logpkg) if err != nil { diff --git a/cmd/torusctl/init.go b/cmd/torusctl/init.go old mode 100644 new mode 100755 index a06069d..f45a720 --- a/cmd/torusctl/init.go +++ b/cmd/torusctl/init.go @@ -71,6 +71,8 @@ func initAction(cmd *cobra.Command, args []string) { if err != nil { die("error writing metadata: %v", err) } + + fmt.Println("this mds init sucessed") } func viewMetadata() { diff --git a/cmd/torusd/main.go b/cmd/torusd/main.go old mode 100644 new mode 100755 index 75b2681..8d961f2 --- a/cmd/torusd/main.go +++ b/cmd/torusd/main.go @@ -86,10 +86,10 @@ func configureServer(cmd *cobra.Command, args []string) { case debug: capnslog.SetGlobalLogLevel(capnslog.DEBUG) default: - capnslog.SetGlobalLogLevel(capnslog.INFO) + capnslog.SetGlobalLogLevel(capnslog.TRACE) } if logpkg != "" { - capnslog.SetGlobalLogLevel(capnslog.NOTICE) + capnslog.SetGlobalLogLevel(capnslog.TRACE) rl := capnslog.MustRepoLogger("github.com/coreos/torus") llc, err := rl.ParseLogLevelConfig(logpkg) if err != nil { From 01253f6f6ff6ff7bcfe45d4b1f588ff1e454b3dc Mon Sep 17 00:00:00 2001 From: Edward Date: Fri, 28 Apr 2017 14:46:14 +0800 Subject: [PATCH 5/5] add some comments to method for easy understand. --- block/blockfile.go | 4 ++ cliconfig/config.go | 0 distributor/client.go | 2 +- distributor/lru.go | 4 ++ distributor/rebalance.go | 2 +- heartbeat.go | 0 internal/nbd/nbd.go | 5 ++ internal/tcmu/connect.go | 4 ++ local_server.go | 3 +- metadata/etcd/etcd.go | 92 +++++++++++++---------------------- metadata/etcd/global_funcs.go | 8 +++ metadata/etcd/helpers.go | 6 +++ metadata/etcd/key_modify.go | 62 +++++++++++++++++++++++ metadata/etcd/ring_watch.go | 2 + models/rpc.pb.go | 2 +- ring/ring_main.go | 3 ++ storage/mfile.go | 5 ++ 17 files changed, 142 insertions(+), 62 deletions(-) mode change 100644 => 100755 block/blockfile.go mode change 100644 => 100755 cliconfig/config.go mode change 100644 => 100755 distributor/client.go mode change 100644 => 100755 distributor/lru.go mode change 100644 => 100755 distributor/rebalance.go mode change 100644 => 100755 heartbeat.go mode change 100644 => 100755 internal/nbd/nbd.go mode change 100644 => 100755 internal/tcmu/connect.go mode change 100644 => 100755 local_server.go mode change 100644 => 100755 metadata/etcd/etcd.go mode change 100644 => 100755 metadata/etcd/global_funcs.go mode change 100644 => 100755 metadata/etcd/helpers.go create mode 100755 metadata/etcd/key_modify.go mode change 100644 => 100755 metadata/etcd/ring_watch.go mode change 100644 => 100755 ring/ring_main.go mode change 100644 => 100755 storage/mfile.go diff --git a/block/blockfile.go b/block/blockfile.go old mode 100644 new mode 100755 index d11d4fc..393ce27 --- a/block/blockfile.go +++ b/block/blockfile.go @@ -6,6 +6,10 @@ import ( "golang.org/x/net/context" ) +/** +**using the torus file as the block device +**it extend from File and contains the BlockVolume + */ type BlockFile struct { *torus.File vol *BlockVolume diff --git a/cliconfig/config.go b/cliconfig/config.go old mode 100644 new mode 100755 diff --git a/distributor/client.go b/distributor/client.go old mode 100644 new mode 100755 index 8f59708..4bb2955 --- a/distributor/client.go +++ b/distributor/client.go @@ -48,8 +48,8 @@ func (d *distClient) onPeerTimeout(uuid string) { clog.Errorf("peer timeout err on close: %s", err) } delete(d.openConns, uuid) - } + func (d *distClient) getConn(uuid string) protocols.RPC { d.mut.Lock() if conn, ok := d.openConns[uuid]; ok { diff --git a/distributor/lru.go b/distributor/lru.go old mode 100644 new mode 100755 index f99907d..5a4bb45 --- a/distributor/lru.go +++ b/distributor/lru.go @@ -1,3 +1,4 @@ +//implementation the LRU cache package distributor import ( @@ -26,12 +27,15 @@ func newCache(size int) *cache { return &lru } +//put a key-value pair into cache, nothing to be done if the key exist func (lru *cache) Put(key string, value interface{}) { if lru == nil { return } lru.mut.Lock() defer lru.mut.Unlock() + + //if the key exist, nothing to be done if _, ok := lru.get(key); ok { return } diff --git a/distributor/rebalance.go b/distributor/rebalance.go old mode 100644 new mode 100755 index 450557d..176c683 --- a/distributor/rebalance.go +++ b/distributor/rebalance.go @@ -46,7 +46,7 @@ func (d *Distributor) rebalanceTicker(closer chan struct{}) { time.Sleep(time.Duration(250+rand.Intn(250)) * time.Millisecond) exit: for { - clog.Tracef("starting rebalance/gc cycle") + // clog.Tracef("starting rebalance/gc cycle") volset, _, err := d.srv.MDS.GetVolumes() if err != nil { clog.Error(err) diff --git a/heartbeat.go b/heartbeat.go old mode 100644 new mode 100755 diff --git a/internal/nbd/nbd.go b/internal/nbd/nbd.go old mode 100644 new mode 100755 index 5efe342..d3b10d4 --- a/internal/nbd/nbd.go +++ b/internal/nbd/nbd.go @@ -133,6 +133,11 @@ func (nbd *NBD) SetBlockSize(blocksize int64) error { return nil } +/** +*** This method is to find the available nbd* device +*** according to the device name to find if the /dev folder has this "file" +*** if the file is exist, then to check the /sys/block/nbd#/pid + */ func FindDevice() (string, error) { // FIXME: Oh god... fixme. // find free nbd device diff --git a/internal/tcmu/connect.go b/internal/tcmu/connect.go old mode 100644 new mode 100755 index ae68196..e9d10e6 --- a/internal/tcmu/connect.go +++ b/internal/tcmu/connect.go @@ -61,8 +61,10 @@ type torusHandler struct { func (h *torusHandler) HandleCommand(cmd *tcmu.SCSICmd) (tcmu.SCSIResponse, error) { switch cmd.Command() { + //query the storage basic information: verdor name, product name, product version case scsi.Inquiry: return tcmu.EmulateInquiry(cmd, h.inq) + //the signal sent by PC to make sure the storage device is stil alive. case scsi.TestUnitReady: return tcmu.EmulateTestUnitReady(cmd) case scsi.ServiceActionIn16: @@ -71,8 +73,10 @@ func (h *torusHandler) HandleCommand(cmd *tcmu.SCSICmd) (tcmu.SCSIResponse, erro return tcmu.EmulateModeSense(cmd, true) case scsi.ModeSelect, scsi.ModeSelect10: return tcmu.EmulateModeSelect(cmd, true) + //Read10: sent by PC, to query the data from offset and length. case scsi.Read6, scsi.Read10, scsi.Read12, scsi.Read16: return tcmu.EmulateRead(cmd, h.file) + //write10:write the data from PC into the specified address[offset, length] case scsi.Write6, scsi.Write10, scsi.Write12, scsi.Write16: return h.handleWrite(cmd) case scsi.SynchronizeCache, scsi.SynchronizeCache16: diff --git a/local_server.go b/local_server.go old mode 100644 new mode 100755 index 5ce00cb..3112668 --- a/local_server.go +++ b/local_server.go @@ -27,7 +27,8 @@ func MkdirsFor(dir string) error { } func NewServer(cfg Config, metadataServiceKind, blockStoreKind string) (*Server, error) { - err := MkdirsFor(cfg.DataDir) + + err := MkdirsFor(cfg.DataDir) //create two folders under the data-dirs, one is "metadata",anther is "block" if err != nil { return nil, err } diff --git a/metadata/etcd/etcd.go b/metadata/etcd/etcd.go old mode 100644 new mode 100755 index 3ac0725..321d077 --- a/metadata/etcd/etcd.go +++ b/metadata/etcd/etcd.go @@ -77,6 +77,7 @@ func newEtcdMetadata(cfg torus.Config) (torus.MetadataService, error) { uuid = metadata.MakeUUID() } else { uuid, err = metadata.GetUUID(cfg.DataDir) + //create an uuid file under the /metadata folder and write one number which generated by uuid into it } if err != nil { return nil, err @@ -119,6 +120,9 @@ func (e *Etcd) Close() error { return e.Client.Close() } +//read the Global metadata string from etcd server with key +//"/github.com/coreos/torus/meta/globalmetadata" +//set global value via Unmarshal as a GlobalMetadata struct func (e *Etcd) getGlobalMetadata() error { txn := e.Client.Txn(context.Background()) resp, err := txn.If( @@ -142,6 +146,7 @@ func (e *Etcd) getGlobalMetadata() error { return nil } +//the only thing is to clone the etcdCtx func (e *Etcd) WithContext(ctx context.Context) torus.MetadataService { return &etcdCtx{ etcd: e, @@ -149,12 +154,14 @@ func (e *Etcd) WithContext(ctx context.Context) torus.MetadataService { } } +//add one ring listener that will get the new ring when the ring updated func (e *Etcd) SubscribeNewRings(ch chan torus.Ring) { e.mut.Lock() defer e.mut.Unlock() e.ringListeners = append(e.ringListeners, ch) } +//remove the exist listener func (e *Etcd) UnsubscribeNewRings(ch chan torus.Ring) { e.mut.Lock() defer e.mut.Unlock() @@ -167,6 +174,7 @@ func (e *Etcd) UnsubscribeNewRings(ch chan torus.Ring) { // Context-sensitive calls +//return the ctx value, the context.Background will be returned when value is nil func (c *etcdCtx) getContext() context.Context { if c.ctx == nil { return context.Background() @@ -174,6 +182,7 @@ func (c *etcdCtx) getContext() context.Context { return c.ctx } +//the same as Etcd WithContext to clone one EtcdCtx func (c *etcdCtx) WithContext(ctx context.Context) torus.MetadataService { return c.etcd.WithContext(ctx) } @@ -190,6 +199,7 @@ func (c *etcdCtx) UUID() string { return c.etcd.uuid } +//add the PeerInfo with lease into etcd server func (c *etcdCtx) RegisterPeer(lease int64, p *models.PeerInfo) error { if lease == 0 { return errors.New("no lease") @@ -231,62 +241,6 @@ func (c *etcdCtx) GetPeers() (torus.PeerInfoList, error) { return torus.PeerInfoList(out), nil } -// AtomicModifyFunc is a class of commutative functions that, given the current -// state of a key's value `in`, returns the new state of the key `out`, and -// `data` to be returned to the calling function on success, or an `err`. -// -// This function may be run multiple times, if the value has changed in the time -// between getting the data and setting the new value. -type AtomicModifyFunc func(in []byte) (out []byte, data interface{}, err error) - -// TODO(barakmich): Perhaps make this an etcd client library function. -func (c *etcdCtx) AtomicModifyKey(k []byte, f AtomicModifyFunc) (interface{}, error) { - key := string(k) - resp, err := c.etcd.Client.Get(c.getContext(), key) - if err != nil { - return nil, err - } - var version int64 - var value []byte - if len(resp.Kvs) != 1 { - version = 0 - value = []byte{} - } else { - kv := resp.Kvs[0] - version = kv.Version - value = kv.Value - } - for { - newBytes, fval, err := f(value) - if err != nil { - return nil, err - } - txn := c.etcd.Client.Txn(c.getContext()).If( - etcdv3.Compare(etcdv3.Version(key), "=", version), - ).Then( - etcdv3.OpPut(key, string(newBytes)), - ).Else( - etcdv3.OpGet(key), - ) - resp, err := txn.Commit() - if err != nil { - return nil, err - } - if resp.Succeeded { - return fval, nil - } - promAtomicRetries.WithLabelValues(string(key)).Inc() - kv := resp.Responses[0].GetResponseRange().Kvs[0] - version = kv.Version - value = kv.Value - } -} - -func BytesAddOne(in []byte) ([]byte, interface{}, error) { - newval := BytesToUint64(in) + 1 - return Uint64ToBytes(newval), newval, nil -} - func (c *etcdCtx) GetVolumes() ([]*models.Volume, torus.VolumeID, error) { promOps.WithLabelValues("get-volumes").Inc() txn := c.etcd.Client.Txn(c.getContext()).Then( @@ -311,6 +265,9 @@ func (c *etcdCtx) GetVolumes() ([]*models.Volume, torus.VolumeID, error) { return out, torus.VolumeID(highwater), nil } +//get volume from the metadata store with the volume name. +//get the volume id via request "/github.com/coreos/torus/volumes/" +//get volume information via request "/github.com/coreos/torus/volumes/" func (c *etcdCtx) GetVolume(volume string) (*models.Volume, error) { if v, ok := c.etcd.volumesCache[volume]; ok { return v, nil @@ -343,6 +300,8 @@ func (c *etcdCtx) GetVolume(volume string) (*models.Volume, error) { return v, nil } +//query the volume lock status. +//get value via request "/github.com/coreos/torus/volumemeta//blocklock" func (c *etcdCtx) GetLockStatus(vid uint64) string { resp, err := c.etcd.Client.Get(c.getContext(), MkKey("volumemeta", Uint64ToHex(uint64(vid)), "blocklock")) if err != nil { @@ -355,6 +314,7 @@ func (c *etcdCtx) GetLockStatus(vid uint64) string { return "in-use" } +//query the client lease func (c *etcdCtx) GetLease() (int64, error) { resp, err := c.etcd.Client.Grant(c.getContext(), leaseTTL) if err != nil { @@ -364,22 +324,28 @@ func (c *etcdCtx) GetLease() (int64, error) { return int64(resp.ID), nil } +//set the new lease value for client func (c *etcdCtx) RenewLease(lease int64) error { lid := etcdv3.LeaseID(lease) - resp, err := c.etcd.Client.KeepAliveOnce(c.getContext(), lid) + // resp, err := c.etcd.Client.KeepAliveOnce(c.getContext(), lid) + _, err := c.etcd.Client.KeepAliveOnce(c.getContext(), lid) if err != nil { if err == rpctypes.ErrLeaseNotFound { return torus.ErrLeaseNotFound } return err } - clog.Tracef("updated lease for %d, TTL %d", resp.ID, resp.TTL) + // clog.Tracef("updated lease for %d, TTL %d", resp.ID, resp.TTL) return nil } + +//query the ring via EtcdCtx getRing() func (c *etcdCtx) GetRing() (torus.Ring, error) { r, _, err := c.getRing() return r, err } + +//query the ring via request "/github.com/coreos/torus/meta/the-one-ring" func (c *etcdCtx) getRing() (torus.Ring, int64, error) { promOps.WithLabelValues("get-ring").Inc() resp, err := c.etcd.Client.Get(c.getContext(), MkKey("meta", "the-one-ring")) @@ -396,14 +362,17 @@ func (c *etcdCtx) getRing() (torus.Ring, int64, error) { return ring, resp.Kvs[0].Version, nil } +//call the Etcd SubscribeNewRings() func (c *etcdCtx) SubscribeNewRings(ch chan torus.Ring) { c.etcd.SubscribeNewRings(ch) } +//call the Etcd UnsubscribeNewRings() func (c *etcdCtx) UnsubscribeNewRings(ch chan torus.Ring) { c.etcd.UnsubscribeNewRings(ch) } +//set the new ring value via comparing the version value func (c *etcdCtx) SetRing(ring torus.Ring) error { oldr, etcdver, err := c.getRing() if err != nil { @@ -432,6 +401,9 @@ func (c *etcdCtx) SetRing(ring torus.Ring) error { return torus.ErrAgain } +//generate an inode id. +//transfer the "/github.com/coreos/torus/volumemeta//inode" into byte array +//then run byte add 1 then right this value into metadata server func (c *etcdCtx) CommitINodeIndex(vid torus.VolumeID) (torus.INodeID, error) { promOps.WithLabelValues("commit-inode-index").Inc() c.etcd.mut.Lock() @@ -444,6 +416,9 @@ func (c *etcdCtx) CommitINodeIndex(vid torus.VolumeID) (torus.INodeID, error) { return torus.INodeID(newID.(uint64)), nil } +//generate an inode id. +//transfer the "/github.com/coreos/torus/meta/volumeminter" into byte array +//then write the value generated via adding 1 onto the byte array into metadata func (c *etcdCtx) NewVolumeID() (torus.VolumeID, error) { c.etcd.mut.Lock() defer c.etcd.mut.Unlock() @@ -455,6 +430,7 @@ func (c *etcdCtx) NewVolumeID() (torus.VolumeID, error) { return torus.VolumeID(newID.(uint64)), nil } +//get the INode Id from metadata server func (c *etcdCtx) GetINodeIndex(vid torus.VolumeID) (torus.INodeID, error) { promOps.WithLabelValues("get-inode-index").Inc() c.etcd.mut.Lock() diff --git a/metadata/etcd/global_funcs.go b/metadata/etcd/global_funcs.go old mode 100644 new mode 100755 index 86cd52c..44a7143 --- a/metadata/etcd/global_funcs.go +++ b/metadata/etcd/global_funcs.go @@ -11,6 +11,14 @@ import ( "golang.org/x/net/context" ) +//this the etcd global functions, includes three functions +//**initEtcdMetadata() is to write the basic metadata into Etcd server, include +// 1. /github.com/coreos/torus/meta/volumeminter +// 2. /github.com/coreos/torus/meta/globalmetadata +// 3. /github.com/coreos/torus/meta/the-one-ring +//**wipeEtcdMetadata() will clean information setted by abover function +//**setRing() will change the /github.com/coreos/torus/meta/the-one-ring value + func initEtcdMetadata(cfg torus.Config, gmd torus.GlobalMetadata, ringType torus.RingType) error { gmdbytes, err := json.Marshal(gmd) if err != nil { diff --git a/metadata/etcd/helpers.go b/metadata/etcd/helpers.go old mode 100644 new mode 100755 index 572aadb..7138695 --- a/metadata/etcd/helpers.go +++ b/metadata/etcd/helpers.go @@ -7,6 +7,12 @@ import ( "path" ) +//this the an util file to do same basic jobs, such as: +//MkKey(),Uint64ToBytes(),BytesToUint64(),Uint64ToHex() + +//combine the strings with a prefix string +//e.g. it gets "/github.com/freesky/edward" when calling +//MkKey("freesky","edward") while KeyPrefix="/github.com" func MkKey(s ...string) string { s = append([]string{KeyPrefix}, s...) return path.Join(s...) diff --git a/metadata/etcd/key_modify.go b/metadata/etcd/key_modify.go new file mode 100755 index 0000000..4e50d18 --- /dev/null +++ b/metadata/etcd/key_modify.go @@ -0,0 +1,62 @@ +package etcd + +import ( + etcdv3 "github.com/coreos/etcd/clientv3" +) + +// AtomicModifyFunc is a class of commutative functions that, given the current +// state of a key's value `in`, returns the new state of the key `out`, and +// `data` to be returned to the calling function on success, or an `err`. +// +// This function may be run multiple times, if the value has changed in the time +// between getting the data and setting the new value. +type AtomicModifyFunc func(in []byte) (out []byte, data interface{}, err error) + +// TODO(barakmich): Perhaps make this an etcd client library function. +func (c *etcdCtx) AtomicModifyKey(k []byte, f AtomicModifyFunc) (interface{}, error) { + key := string(k) + resp, err := c.etcd.Client.Get(c.getContext(), key) + if err != nil { + return nil, err + } + var version int64 + var value []byte + if len(resp.Kvs) != 1 { + version = 0 + value = []byte{} + } else { + kv := resp.Kvs[0] + version = kv.Version + value = kv.Value + } + for { + newBytes, fval, err := f(value) + if err != nil { + return nil, err + } + txn := c.etcd.Client.Txn(c.getContext()).If( + etcdv3.Compare(etcdv3.Version(key), "=", version), + ).Then( + etcdv3.OpPut(key, string(newBytes)), + ).Else( + etcdv3.OpGet(key), + ) + resp, err := txn.Commit() + if err != nil { + return nil, err + } + if resp.Succeeded { + return fval, nil + } + promAtomicRetries.WithLabelValues(string(key)).Inc() + kv := resp.Responses[0].GetResponseRange().Kvs[0] + version = kv.Version + value = kv.Value + } +} + +//to increase one on the given value. +func BytesAddOne(in []byte) ([]byte, interface{}, error) { + newval := BytesToUint64(in) + 1 + return Uint64ToBytes(newval), newval, nil +} diff --git a/metadata/etcd/ring_watch.go b/metadata/etcd/ring_watch.go old mode 100644 new mode 100755 index d8989e6..3dba04c --- a/metadata/etcd/ring_watch.go +++ b/metadata/etcd/ring_watch.go @@ -7,6 +7,8 @@ import ( "github.com/coreos/torus/ring" ) +//watch the ring updation event. + func (e *Etcd) watchRingUpdates() error { r, err := e.GetRing() if err != nil { diff --git a/models/rpc.pb.go b/models/rpc.pb.go index 1e94f87..3b46f23 100644 --- a/models/rpc.pb.go +++ b/models/rpc.pb.go @@ -568,7 +568,7 @@ var _ grpc.ClientConn // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion2 +//const _ = grpc.SupportPackageIsVersion2 // Client API for TorusStorage service diff --git a/ring/ring_main.go b/ring/ring_main.go old mode 100644 new mode 100755 index 668f8de..495d180 --- a/ring/ring_main.go +++ b/ring/ring_main.go @@ -19,6 +19,7 @@ const ( Ketama ) +//Deserialization byte array into Ring struct. func Unmarshal(b []byte) (torus.Ring, error) { var a models.Ring err := a.Unmarshal(b) @@ -55,10 +56,12 @@ func registerRing(t torus.RingType, name string, newFunc createRingFunc) { ringNames[name] = t } +//call the registed function. also it implementation of createRingFunc func CreateRing(r *models.Ring) (torus.Ring, error) { return ringRegistry[torus.RingType(r.Type)](r) } +//find the RingType via ring name func RingTypeFromString(s string) (torus.RingType, bool) { v, ok := ringNames[s] return v, ok diff --git a/storage/mfile.go b/storage/mfile.go old mode 100644 new mode 100755 index f968039..fe63132 --- a/storage/mfile.go +++ b/storage/mfile.go @@ -36,6 +36,7 @@ type mfileBlock struct { var blankRefBytes = make([]byte, torus.BlockRefByteSize) +//load the reference index map which is one of the member of mfileBlock struct data func loadIndex(m *MFile) (map[torus.BlockRef]int, error) { clog.Infof("loading block index...") var membefore uint64 @@ -61,6 +62,8 @@ func loadIndex(m *MFile) (map[torus.BlockRef]int, error) { return out, nil } +//implementation of blockStore factory method, +//so that storage knows how to create storage instance. func newMFileBlockStore(name string, cfg torus.Config, meta torus.GlobalMetadata) (torus.BlockStore, error) { storageSize := cfg.StorageSize @@ -196,6 +199,7 @@ func (m *mfileBlock) HasBlock(_ context.Context, s torus.BlockRef) (bool, error) return true, nil } +//get a blockdata via BlockRef func (m *mfileBlock) GetBlock(_ context.Context, s torus.BlockRef) ([]byte, error) { m.mut.RLock() defer m.mut.RUnlock() @@ -213,6 +217,7 @@ func (m *mfileBlock) GetBlock(_ context.Context, s torus.BlockRef) ([]byte, erro return m.dataFile.GetBlock(uint64(index)), nil } +//write a the block data into the file func (m *mfileBlock) WriteBlock(_ context.Context, s torus.BlockRef, data []byte) error { m.mut.Lock() defer m.mut.Unlock()