Skip to content

Commit f3e1094

Browse files
authored
Merge pull request #153 from 0chain/fscache
Add minio cache for gateway
2 parents b8f9845 + 3ed5a72 commit f3e1094

File tree

11 files changed

+623
-74
lines changed

11 files changed

+623
-74
lines changed

cmd/bucket-listobjects-handlers.go

Lines changed: 173 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ package cmd
1919

2020
import (
2121
"context"
22+
"log"
2223
"net/http"
24+
"sort"
2325
"strconv"
2426
"strings"
27+
"sync"
28+
"time"
2529

2630
"github.com/gorilla/mux"
2731
"github.com/minio/minio/internal/logger"
@@ -46,12 +50,91 @@ func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) {
4650
g.Wait()
4751
}
4852

53+
func mergeListObjects(l1, l2 []ObjectInfo) []ObjectInfo {
54+
mergedMap := make(map[string]ObjectInfo)
55+
56+
// Helper function to add/update map entries
57+
addOrUpdate := func(obj ObjectInfo) {
58+
if existingObj, found := mergedMap[obj.Name]; !found || obj.ModTime.After(existingObj.ModTime) {
59+
mergedMap[obj.Name] = obj
60+
}
61+
}
62+
for _, obj := range l1 {
63+
addOrUpdate(obj)
64+
}
65+
for _, obj := range l2 {
66+
addOrUpdate(obj)
67+
}
68+
69+
mergedList := make([]ObjectInfo, 0, len(mergedMap))
70+
for _, obj := range mergedMap {
71+
mergedList = append(mergedList, obj)
72+
}
73+
74+
return mergedList
75+
}
76+
77+
func mergePrefixes(l1, l2 []string) []string {
78+
mergedMap := make(map[string]bool)
79+
80+
// Helper function to add/update map entries
81+
addOrUpdate := func(pre string) {
82+
if _, found := mergedMap[pre]; !found {
83+
mergedMap[pre] = true
84+
}
85+
}
86+
for _, pre := range l1 {
87+
addOrUpdate(pre)
88+
}
89+
for _, pre := range l2 {
90+
addOrUpdate(pre)
91+
}
92+
93+
mergedList := make([]string, 0, len(mergedMap))
94+
for pre, _ := range mergedMap {
95+
mergedList = append(mergedList, pre)
96+
}
97+
98+
return mergedList
99+
}
100+
101+
func limitMergeObjects(mergeObjects []ObjectInfo, mergePrefixes []string, maxKeys int) ([]ObjectInfo, []string, string) {
102+
objPrefixMap := map[string]ObjectInfo{}
103+
for _, ob := range mergeObjects {
104+
objPrefixMap[ob.Name] = ob
105+
}
106+
for _, pre := range mergePrefixes {
107+
objPrefixMap[pre] = ObjectInfo{IsDir: true}
108+
}
109+
110+
keys := make([]string, 0, len(objPrefixMap))
111+
for key := range objPrefixMap {
112+
keys = append(keys, key)
113+
}
114+
sort.Strings(keys)
115+
limitedObjs := []ObjectInfo{}
116+
limitedPrefixes := []string{}
117+
nextMarker := ""
118+
for i, key := range keys {
119+
if objPrefixMap[key].IsDir {
120+
limitedPrefixes = append(limitedPrefixes, key)
121+
} else {
122+
limitedObjs = append(limitedObjs, objPrefixMap[key])
123+
}
124+
if i >= (maxKeys - 1) {
125+
nextMarker = key
126+
break
127+
}
128+
}
129+
return limitedObjs, limitedPrefixes, nextMarker
130+
}
131+
49132
// Validate all the ListObjects query arguments, returns an APIErrorCode
50133
// if one of the args do not meet the required conditions.
51134
// Special conditions required by MinIO server are as below
52-
// - delimiter if set should be equal to '/', otherwise the request is rejected.
53-
// - marker if set should have a common prefix with 'prefix' param, otherwise
54-
// the request is rejected.
135+
// - delimiter if set should be equal to '/', otherwise the request is rejected.
136+
// - marker if set should have a common prefix with 'prefix' param, otherwise
137+
// the request is rejected.
55138
func validateListObjectsArgs(marker, delimiter, encodingType string, maxKeys int) APIErrorCode {
56139
// Max keys cannot be negative.
57140
if maxKeys < 0 {
@@ -200,6 +283,11 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
200283
// NOTE: It is recommended that this API to be used for application development.
201284
// MinIO continues to support ListObjectsV1 for supporting legacy tools.
202285
func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
286+
st := time.Now()
287+
defer func() {
288+
elapsed := time.Since(st).Milliseconds()
289+
log.Printf("ListObjectsV2Handler took %d ms\n", elapsed)
290+
}()
203291
ctx := newContext(r, w, "ListObjectsV2")
204292

205293
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
@@ -235,9 +323,17 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
235323
}
236324

237325
var (
238-
listObjectsV2Info ListObjectsV2Info
239-
err error
326+
listObjectsV2Info ListObjectsV2Info
327+
listObjectsV2InfoCache ListObjectsV2Info
328+
err error
329+
errC error
240330
)
331+
listObjectsV2Cache := objectAPI.ListObjectsV2
332+
cacheEnabled := false
333+
if api.CacheAPI() != nil {
334+
cacheEnabled = true
335+
listObjectsV2Cache = api.CacheAPI().ListObjectsV2
336+
}
241337

242338
if r.Header.Get(xMinIOExtract) == "true" && strings.Contains(prefix, archivePattern) {
243339
// Inititate a list objects operation inside a zip file based in the input params
@@ -246,13 +342,38 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
246342
// Inititate a list objects operation based on the input params.
247343
// On success would return back ListObjectsInfo object to be
248344
// marshaled into S3 compatible XML header.
249-
listObjectsV2Info, err = objectAPI.ListObjectsV2(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
345+
var wg sync.WaitGroup
346+
wg.Add(2)
347+
go func() {
348+
defer wg.Done()
349+
listObjectsV2Info, err = objectAPI.ListObjectsV2(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
350+
}()
351+
go func() {
352+
defer wg.Done()
353+
if cacheEnabled {
354+
stc := time.Now()
355+
listObjectsV2InfoCache, errC = listObjectsV2Cache(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
356+
elap := time.Since(stc)
357+
log.Println("ListV2 object cache time", elap)
358+
}
359+
}()
360+
wg.Wait()
250361
}
251-
if err != nil {
362+
if err != nil || errC != nil {
252363
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
253364
return
254365
}
255366

367+
mergeObjects := mergeListObjects(listObjectsV2Info.Objects, listObjectsV2InfoCache.Objects)
368+
mergePrefixes := mergePrefixes(listObjectsV2Info.Prefixes, listObjectsV2InfoCache.Prefixes)
369+
limitedObjects, limitedPrefix, nextMarker := limitMergeObjects(mergeObjects, mergePrefixes, maxKeys)
370+
listObjectsV2Info.Objects = limitedObjects
371+
listObjectsV2Info.Prefixes = limitedPrefix
372+
if nextMarker != "" {
373+
listObjectsV2Info.NextContinuationToken = nextMarker
374+
listObjectsV2Info.IsTruncated = true
375+
}
376+
256377
concurrentDecryptETag(ctx, listObjectsV2Info.Objects)
257378

258379
response := generateListObjectsV2Response(bucket, prefix, token, listObjectsV2Info.NextContinuationToken, startAfter,
@@ -306,8 +427,12 @@ func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http
306427
// This implementation of the GET operation returns some or all (up to 1000)
307428
// of the objects in a bucket. You can use the request parameters as selection
308429
// criteria to return a subset of the objects in a bucket.
309-
//
310430
func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
431+
st := time.Now()
432+
defer func() {
433+
elapsed := time.Since(st).Milliseconds()
434+
log.Printf("ListObjectsV1Handler took %d ms\n", elapsed)
435+
}()
311436
ctx := newContext(r, w, "ListObjectsV1")
312437

313438
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
@@ -340,15 +465,52 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
340465
}
341466

342467
listObjects := objectAPI.ListObjects
343-
468+
listObjectsCache := objectAPI.ListObjects
469+
cacheEnabled := false
470+
if api.CacheAPI() != nil {
471+
cacheEnabled = true
472+
listObjectsCache = api.CacheAPI().ListObjects
473+
}
344474
// Inititate a list objects operation based on the input params.
345475
// On success would return back ListObjectsInfo object to be
346476
// marshaled into S3 compatible XML header.
347-
listObjectsInfo, err := listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
348-
if err != nil {
477+
var (
478+
listObjectsInfo ListObjectsInfo
479+
listObjectsInfoCache ListObjectsInfo
480+
err error
481+
errC error
482+
)
483+
var wg sync.WaitGroup
484+
wg.Add(2)
485+
go func() {
486+
defer wg.Done()
487+
listObjectsInfo, err = listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
488+
}()
489+
go func() {
490+
defer wg.Done()
491+
if cacheEnabled {
492+
stc := time.Now()
493+
listObjectsInfoCache, errC = listObjectsCache(ctx, bucket, prefix, marker, delimiter, maxKeys)
494+
elap := time.Since(stc)
495+
log.Println("ListV1 object cache time", elap)
496+
}
497+
}()
498+
499+
wg.Wait()
500+
501+
if err != nil || errC != nil {
349502
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
350503
return
351504
}
505+
mergeObjects := mergeListObjects(listObjectsInfo.Objects, listObjectsInfoCache.Objects)
506+
mergePrefixes := mergePrefixes(listObjectsInfo.Prefixes, listObjectsInfoCache.Prefixes)
507+
limitedObjects, limitedPrefix, nextMarker := limitMergeObjects(mergeObjects, mergePrefixes, maxKeys)
508+
listObjectsInfo.Objects = limitedObjects
509+
listObjectsInfo.Prefixes = limitedPrefix
510+
if nextMarker != "" {
511+
listObjectsInfo.NextMarker = nextMarker
512+
listObjectsInfo.IsTruncated = true
513+
}
352514

353515
concurrentDecryptETag(ctx, listObjectsInfo.Objects)
354516

cmd/disk-cache-backend.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"fmt"
2828
"io"
2929
"io/ioutil"
30+
"log"
3031
"net/http"
3132
"os"
3233
"path"
@@ -205,7 +206,7 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
205206
commitWriteback: config.CacheCommitMode == CommitWriteBack,
206207
commitWritethrough: config.CacheCommitMode == CommitWriteThrough,
207208

208-
retryWritebackCh: make(chan ObjectInfo, 10000),
209+
retryWritebackCh: make(chan ObjectInfo, 100000),
209210
online: 1,
210211
pool: sync.Pool{
211212
New: func() interface{} {
@@ -218,7 +219,22 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
218219
go cache.purgeWait(ctx)
219220
go cache.cleanupStaleUploads(ctx)
220221
if cache.commitWriteback {
221-
go cache.scanCacheWritebackFailures(ctx)
222+
go func() {
223+
tickInterval := time.Duration(config.WriteBackInterval) * time.Second
224+
log.Println("write back time interval", tickInterval)
225+
ticker := time.NewTicker(tickInterval)
226+
defer ticker.Stop()
227+
defer close(cache.retryWritebackCh)
228+
for {
229+
select {
230+
case <-ticker.C:
231+
cache.scanCacheWritebackFailures(ctx)
232+
case <-ctx.Done():
233+
return
234+
}
235+
}
236+
}()
237+
//go cache.scanCacheWritebackFailures(ctx)
222238
}
223239
cache.diskSpaceAvailable(0) // update if cache usage is already high.
224240
cache.NewNSLockFn = func(cachePath string) RWLocker {
@@ -1225,7 +1241,8 @@ func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
12251241

12261242
// queues writeback upload failures on server startup
12271243
func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
1228-
defer close(c.retryWritebackCh)
1244+
log.Println("scan cache write back failures")
1245+
//defer close(c.retryWritebackCh) // don't close the channel
12291246
filterFn := func(name string, typ os.FileMode) error {
12301247
if name == minioMetaBucket {
12311248
// Proceed to next file.

0 commit comments

Comments
 (0)