feat(kv): persist cache and session into disk before shutdown

This commit is contained in:
Aaron Liu
2023-04-16 09:17:06 +08:00
parent 4d131db504
commit b9d9e036c9
12 changed files with 347 additions and 57 deletions

79
pkg/cache/memo.go vendored
View File

@@ -1,6 +1,9 @@
package cache
import (
"encoding/gob"
"fmt"
"os"
"sync"
"time"
@@ -14,18 +17,20 @@ type MemoStore struct {
// item 存储的对象
type itemWithTTL struct {
expires int64
value interface{}
Expires int64
Value interface{}
}
const DefaultCacheFile = "cache_persist.bin"
func newItem(value interface{}, expires int) itemWithTTL {
expires64 := int64(expires)
if expires > 0 {
expires64 = time.Now().Unix() + expires64
}
return itemWithTTL{
value: value,
expires: expires64,
Value: value,
Expires: expires64,
}
}
@@ -40,11 +45,11 @@ func getValue(item interface{}, ok bool) (interface{}, bool) {
return item, true
}
if itemObj.expires > 0 && itemObj.expires < time.Now().Unix() {
if itemObj.Expires > 0 && itemObj.Expires < time.Now().Unix() {
return nil, false
}
return itemObj.value, ok
return itemObj.Value, ok
}
@@ -52,7 +57,7 @@ func getValue(item interface{}, ok bool) (interface{}, bool) {
func (store *MemoStore) GarbageCollect() {
store.Store.Range(func(key, value interface{}) bool {
if item, ok := value.(itemWithTTL); ok {
if item.expires > 0 && item.expires < time.Now().Unix() {
if item.Expires > 0 && item.Expires < time.Now().Unix() {
util.Log().Debug("Cache %q is garbage collected.", key.(string))
store.Store.Delete(key)
}
@@ -98,7 +103,7 @@ func (store *MemoStore) Gets(keys []string, prefix string) (map[string]interface
// Sets 批量设置值
func (store *MemoStore) Sets(values map[string]interface{}, prefix string) error {
for key, value := range values {
store.Store.Store(prefix+key, value)
store.Store.Store(prefix+key, newItem(value, 0))
}
return nil
}
@@ -110,3 +115,61 @@ func (store *MemoStore) Delete(keys []string, prefix string) error {
}
return nil
}
// Persist write memory store into cache
func (store *MemoStore) Persist(path string) error {
persisted := make(map[string]itemWithTTL)
store.Store.Range(func(key, value interface{}) bool {
v, ok := store.Store.Load(key)
if _, ok := getValue(v, ok); ok {
persisted[key.(string)] = v.(itemWithTTL)
}
return true
})
res, err := serializer(persisted)
if err != nil {
return fmt.Errorf("failed to serialize cache: %s", err)
}
err = os.WriteFile(path, res, 0644)
return err
}
// Restore memory cache from disk file
func (store *MemoStore) Restore(path string) error {
if !util.Exists(path) {
return nil
}
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to read cache file: %s", err)
}
defer func() {
f.Close()
os.Remove(path)
}()
persisted := &item{}
dec := gob.NewDecoder(f)
if err := dec.Decode(&persisted); err != nil {
return fmt.Errorf("unknown cache file format: %s", err)
}
items := persisted.Value.(map[string]itemWithTTL)
loaded := 0
for k, v := range items {
if _, ok := getValue(v, true); ok {
loaded++
store.Store.Store(k, v)
} else {
util.Log().Debug("Persisted cache %q is expired.", k)
}
}
util.Log().Info("Restored %d items from %q into memory cache.", loaded, path)
return nil
}