-
Notifications
You must be signed in to change notification settings - Fork 255
refactor: remove unnecessary usage of lru #3204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7ca8de2
1911eb9
b1193fa
dbdd2cc
5ec8178
e5aae15
37ee626
44d8d60
9180a15
5d28492
240a0dd
b580bc5
b619b09
df4d8a3
48c735e
dad4eb2
952339c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,23 +8,11 @@ import ( | |
| "sync" | ||
| "sync/atomic" | ||
|
|
||
| lru "github.com/hashicorp/golang-lru/v2" | ||
| ds "github.com/ipfs/go-datastore" | ||
|
|
||
| "github.com/evstack/ev-node/pkg/store" | ||
| ) | ||
|
|
||
| const ( | ||
| // DefaultItemsCacheSize is the default size for items cache. | ||
| DefaultItemsCacheSize = 200_000 | ||
|
|
||
| // DefaultHashesCacheSize is the default size for hash tracking. | ||
| DefaultHashesCacheSize = 200_000 | ||
|
|
||
| // DefaultDAIncludedCacheSize is the default size for DA inclusion tracking. | ||
| DefaultDAIncludedCacheSize = 200_000 | ||
| ) | ||
|
|
||
| // snapshotEntry is one record in the persisted snapshot. | ||
| // Encoded as 16 bytes: [blockHeight uint64 LE][daHeight uint64 LE]. | ||
| type snapshotEntry struct { | ||
|
|
@@ -34,142 +22,98 @@ type snapshotEntry struct { | |
|
|
||
| const snapshotEntrySize = 16 // bytes per snapshotEntry | ||
|
|
||
| // Cache tracks seen blocks and DA inclusion status using bounded LRU caches. | ||
| type Cache[T any] struct { | ||
| // itemsByHeight stores items keyed by uint64 height. | ||
| // Mutex needed for atomic get-and-remove in getNextItem. | ||
| itemsByHeight *lru.Cache[uint64, *T] | ||
| itemsByHeightMu sync.Mutex | ||
| // Cache tracks seen blocks and DA inclusion status. | ||
| type Cache struct { | ||
| mu sync.Mutex | ||
|
|
||
| // hashes tracks whether a given hash has been seen | ||
| hashes *lru.Cache[string, bool] | ||
| hashes map[string]bool | ||
| daIncluded map[string]uint64 | ||
| hashByHeight map[uint64]string | ||
|
Comment on lines
+29
to
+31
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Split the height index by responsibility.
Also applies to: 61-65, 81-89, 95-103, 128-137 🤖 Prompt for AI Agents |
||
| maxDAHeight *atomic.Uint64 | ||
|
|
||
| // daIncluded maps hash → daHeight. Hash may be a real content hash or a | ||
| // height placeholder (see HeightPlaceholderKey) immediately after restore. | ||
| daIncluded *lru.Cache[string, uint64] | ||
|
|
||
| // hashByHeight maps blockHeight → hash, used for pruning and height-based | ||
| // lookups. Protected by hashByHeightMu only in deleteAllForHeight where a | ||
| // read-then-remove must be atomic. | ||
| hashByHeight *lru.Cache[uint64, string] | ||
| hashByHeightMu sync.Mutex | ||
|
|
||
| // maxDAHeight tracks the maximum DA height seen | ||
| maxDAHeight *atomic.Uint64 | ||
|
|
||
| store store.Store // nil = ephemeral, no persistence | ||
| // storeKeyPrefix is the prefix used for store keys | ||
| store store.Store | ||
| storeKeyPrefix string | ||
| } | ||
|
|
||
| func (c *Cache[T]) snapshotKey() string { | ||
| func (c *Cache) snapshotKey() string { | ||
| return c.storeKeyPrefix + "__snap" | ||
| } | ||
|
|
||
| // NewCache creates a Cache. When store and keyPrefix are set, mutations | ||
| // persist a snapshot so RestoreFromStore can recover in-flight state. | ||
| func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { | ||
| // LRU cache creation only fails if size <= 0, which won't happen with our defaults | ||
| itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize) | ||
| hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize) | ||
| daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize) | ||
| hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize) | ||
|
|
||
| return &Cache[T]{ | ||
| itemsByHeight: itemsCache, | ||
| hashes: hashesCache, | ||
| daIncluded: daIncludedCache, | ||
| hashByHeight: hashByHeightCache, | ||
| func NewCache(s store.Store, keyPrefix string) *Cache { | ||
| return &Cache{ | ||
| hashes: make(map[string]bool), | ||
| daIncluded: make(map[string]uint64), | ||
| hashByHeight: make(map[uint64]string), | ||
| maxDAHeight: &atomic.Uint64{}, | ||
| store: s, | ||
| storeKeyPrefix: keyPrefix, | ||
| } | ||
| } | ||
|
|
||
| // getItem returns an item from the cache by height. | ||
| func (c *Cache[T]) getItem(height uint64) *T { | ||
| item, ok := c.itemsByHeight.Get(height) | ||
| if !ok { | ||
| return nil | ||
| } | ||
| return item | ||
| } | ||
|
|
||
| // setItem sets an item in the cache by height. | ||
| func (c *Cache[T]) setItem(height uint64, item *T) { | ||
| c.itemsByHeight.Add(height, item) | ||
| } | ||
|
|
||
| // getNextItem returns and removes the item at height, or nil if absent. | ||
| func (c *Cache[T]) getNextItem(height uint64) *T { | ||
| c.itemsByHeightMu.Lock() | ||
| defer c.itemsByHeightMu.Unlock() | ||
|
|
||
| item, ok := c.itemsByHeight.Get(height) | ||
| if !ok { | ||
| return nil | ||
| } | ||
| c.itemsByHeight.Remove(height) | ||
| return item | ||
| } | ||
|
|
||
| // itemCount returns the number of items currently stored by height. | ||
| func (c *Cache[T]) itemCount() int { | ||
| return c.itemsByHeight.Len() | ||
| func (c *Cache) isSeen(hash string) bool { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| return c.hashes[hash] | ||
| } | ||
|
|
||
| // isSeen returns true if the hash has been seen. | ||
| func (c *Cache[T]) isSeen(hash string) bool { | ||
| seen, ok := c.hashes.Get(hash) | ||
| return ok && seen | ||
| func (c *Cache) setSeen(hash string, height uint64) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| c.hashes[hash] = true | ||
| c.hashByHeight[height] = hash | ||
| } | ||
|
|
||
| // setSeen sets the hash as seen and tracks its height for pruning. | ||
| func (c *Cache[T]) setSeen(hash string, height uint64) { | ||
| c.hashes.Add(hash, true) | ||
| c.hashByHeight.Add(height, hash) | ||
| func (c *Cache) removeSeen(hash string) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| delete(c.hashes, hash) | ||
| } | ||
|
|
||
| // getDAIncluded returns the DA height if the hash has been DA-included. | ||
| func (c *Cache[T]) getDAIncluded(daCommitmentHash string) (uint64, bool) { | ||
| return c.daIncluded.Get(daCommitmentHash) | ||
| func (c *Cache) getDAIncluded(hash string) (uint64, bool) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| v, ok := c.daIncluded[hash] | ||
| return v, ok | ||
| } | ||
|
|
||
| // getDAIncludedByHeight resolves DA height via the height→hash index. | ||
| // Works for both real hashes (steady state) and snapshot placeholders | ||
| // (post-restart, before the DA retriever re-fires the real hash). | ||
| func (c *Cache[T]) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) { | ||
| hash, ok := c.hashByHeight.Get(blockHeight) | ||
| func (c *Cache) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| hash, ok := c.hashByHeight[blockHeight] | ||
| if !ok { | ||
| return 0, false | ||
| } | ||
| return c.getDAIncluded(hash) | ||
| v, exists := c.daIncluded[hash] | ||
| return v, exists | ||
| } | ||
|
|
||
| // setDAIncluded records DA inclusion in memory. | ||
| // If a previous entry already exists at blockHeight (e.g. a placeholder from | ||
| // RestoreFromStore), it is evicted from daIncluded to avoid orphan leaks. | ||
| func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { | ||
| if prev, ok := c.hashByHeight.Get(blockHeight); ok && prev != hash { | ||
| c.daIncluded.Remove(prev) | ||
| func (c *Cache) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| if prev, ok := c.hashByHeight[blockHeight]; ok && prev != hash { | ||
| delete(c.daIncluded, prev) | ||
| } | ||
| c.daIncluded.Add(hash, daHeight) | ||
| c.hashByHeight.Add(blockHeight, hash) | ||
| c.daIncluded[hash] = daHeight | ||
| c.hashByHeight[blockHeight] = hash | ||
| c.setMaxDAHeight(daHeight) | ||
| } | ||
|
|
||
| // removeDAIncluded removes the DA-included status of the hash from the cache. | ||
| func (c *Cache[T]) removeDAIncluded(hash string) { | ||
| c.daIncluded.Remove(hash) | ||
| func (c *Cache) removeDAIncluded(hash string) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| delete(c.daIncluded, hash) | ||
| } | ||
|
|
||
| // daHeight returns the maximum DA height from all DA-included items. | ||
| func (c *Cache[T]) daHeight() uint64 { | ||
| func (c *Cache) daHeight() uint64 { | ||
| return c.maxDAHeight.Load() | ||
| } | ||
|
|
||
| // setMaxDAHeight sets the maximum DA height if the provided value is greater. | ||
| func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { | ||
| func (c *Cache) setMaxDAHeight(daHeight uint64) { | ||
| for range 1_000 { | ||
| current := c.maxDAHeight.Load() | ||
| if daHeight <= current { | ||
|
|
@@ -181,49 +125,41 @@ func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { | |
| } | ||
| } | ||
|
|
||
| // removeSeen removes a hash from the seen cache. | ||
| func (c *Cache[T]) removeSeen(hash string) { | ||
| c.hashes.Remove(hash) | ||
| } | ||
|
|
||
| // deleteAllForHeight removes all items and their associated data from the | ||
| // cache at the given height. | ||
| func (c *Cache[T]) deleteAllForHeight(height uint64) { | ||
| c.itemsByHeight.Remove(height) | ||
|
|
||
| c.hashByHeightMu.Lock() | ||
| hash, ok := c.hashByHeight.Get(height) | ||
| if ok { | ||
| c.hashByHeight.Remove(height) | ||
| func (c *Cache) deleteAllForHeight(height uint64) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| hash, ok := c.hashByHeight[height] | ||
| if !ok { | ||
| return | ||
| } | ||
| c.hashByHeightMu.Unlock() | ||
| delete(c.hashByHeight, height) | ||
| delete(c.hashes, hash) | ||
| delete(c.daIncluded, hash) | ||
| } | ||
|
|
||
| if ok { | ||
| c.hashes.Remove(hash) | ||
| c.daIncluded.Remove(hash) | ||
| } | ||
| func (c *Cache) daIncludedLen() int { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| return len(c.daIncluded) | ||
| } | ||
|
|
||
| // persistSnapshot writes all current in-flight [blockHeight, daHeight] pairs to the store under a single key. | ||
| // Only called explicitly via SaveToStore. NEVER CALL IT ON HOT-PATH TO AVOID BAGER WRITE AMPLIFICATION. | ||
| func (c *Cache[T]) persistSnapshot(ctx context.Context) error { | ||
| func (c *Cache) persistSnapshot(ctx context.Context) error { | ||
| if c.store == nil || c.storeKeyPrefix == "" { | ||
| return nil | ||
| } | ||
|
|
||
| heights := c.hashByHeight.Keys() | ||
| entries := make([]snapshotEntry, 0, len(heights)) | ||
| for _, h := range heights { | ||
| hash, ok := c.hashByHeight.Peek(h) | ||
| if !ok { | ||
| continue | ||
| } | ||
| daH, ok := c.daIncluded.Peek(hash) | ||
| c.mu.Lock() | ||
| entries := make([]snapshotEntry, 0, len(c.hashByHeight)) | ||
| for h, hash := range c.hashByHeight { | ||
| daH, ok := c.daIncluded[hash] | ||
| if !ok { | ||
| continue | ||
| } | ||
| entries = append(entries, snapshotEntry{blockHeight: h, daHeight: daH}) | ||
| } | ||
| c.mu.Unlock() | ||
|
|
||
| return c.store.SetMetadata(ctx, c.snapshotKey(), encodeSnapshot(entries)) | ||
| } | ||
|
|
@@ -257,8 +193,7 @@ func decodeSnapshot(buf []byte) []snapshotEntry { | |
| // RestoreFromStore loads the in-flight snapshot with a single store read. | ||
| // Each entry is installed as a height placeholder; real hashes replace them | ||
| // once the DA retriever re-fires SetHeaderDAIncluded after startup. | ||
| // Missing snapshot key is treated as a no-op (fresh node or pre-snapshot version). | ||
| func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { | ||
| func (c *Cache) RestoreFromStore(ctx context.Context) error { | ||
| if c.store == nil || c.storeKeyPrefix == "" { | ||
| return nil | ||
| } | ||
|
|
@@ -271,10 +206,13 @@ func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { | |
| return fmt.Errorf("reading cache snapshot from store: %w", err) | ||
| } | ||
|
|
||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| for _, e := range decodeSnapshot(buf) { | ||
| placeholder := HeightPlaceholderKey(c.storeKeyPrefix, e.blockHeight) | ||
| c.daIncluded.Add(placeholder, e.daHeight) | ||
| c.hashByHeight.Add(e.blockHeight, placeholder) | ||
| c.daIncluded[placeholder] = e.daHeight | ||
| c.hashByHeight[e.blockHeight] = placeholder | ||
| c.setMaxDAHeight(e.daHeight) | ||
| } | ||
|
|
||
|
|
@@ -297,7 +235,7 @@ func HeightPlaceholderKey(prefix string, height uint64) string { | |
| } | ||
|
|
||
| // SaveToStore flushes the current snapshot to the store. | ||
| func (c *Cache[T]) SaveToStore(ctx context.Context) error { | ||
| func (c *Cache) SaveToStore(ctx context.Context) error { | ||
| if c.store == nil { | ||
| return nil | ||
| } | ||
|
|
@@ -308,7 +246,7 @@ func (c *Cache[T]) SaveToStore(ctx context.Context) error { | |
| } | ||
|
|
||
| // ClearFromStore deletes the snapshot key from the store. | ||
| func (c *Cache[T]) ClearFromStore(ctx context.Context) error { | ||
| func (c *Cache) ClearFromStore(ctx context.Context) error { | ||
| if c.store == nil { | ||
| return nil | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we have a split design of reading and writing, we should be able to use RW
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had automerge on. I'll fix it in a follow up