PlanOpticon

planopticon / video_processor / utils / api_cache.py
Blame History Raw 231 lines
1
"""Caching system for API responses to reduce API calls and costs."""
2
3
import hashlib
4
import json
5
import logging
6
import os
7
import time
8
from pathlib import Path
9
from typing import Any, Dict, Optional, Union
10
11
logger = logging.getLogger(__name__)
12
13
14
class ApiCache:
15
"""Disk-based API response cache."""
16
17
def __init__(
18
self,
19
cache_dir: Union[str, Path],
20
namespace: str = "default",
21
ttl: int = 86400, # 24 hours in seconds
22
):
23
"""
24
Initialize API cache.
25
26
Parameters
27
----------
28
cache_dir : str or Path
29
Directory for cache files
30
namespace : str
31
Cache namespace for organizing cache files
32
ttl : int
33
Time-to-live for cache entries in seconds
34
"""
35
self.cache_dir = Path(cache_dir)
36
self.namespace = namespace
37
self.ttl = ttl
38
39
# Ensure namespace directory exists
40
self.namespace_dir = self.cache_dir / namespace
41
self.namespace_dir.mkdir(parents=True, exist_ok=True)
42
43
logger.debug(f"Initialized API cache in {self.namespace_dir}")
44
45
def get_cache_path(self, key: str) -> Path:
46
"""
47
Get path to cache file for key.
48
49
Parameters
50
----------
51
key : str
52
Cache key
53
54
Returns
55
-------
56
Path
57
Path to cache file
58
"""
59
# Hash the key to ensure valid filename
60
hashed_key = hashlib.md5(key.encode()).hexdigest()
61
return self.namespace_dir / f"{hashed_key}.json"
62
63
def get(self, key: str) -> Optional[Any]:
64
"""
65
Get value from cache.
66
67
Parameters
68
----------
69
key : str
70
Cache key
71
72
Returns
73
-------
74
object or None
75
Cached value if available and not expired, None otherwise
76
"""
77
cache_path = self.get_cache_path(key)
78
79
# Check if cache file exists
80
if not cache_path.exists():
81
return None
82
83
try:
84
# Read cache file
85
with open(cache_path, "r", encoding="utf-8") as f:
86
cache_data = json.load(f)
87
88
# Check if cache entry is expired
89
timestamp = cache_data.get("timestamp", 0)
90
now = time.time()
91
92
if now - timestamp > self.ttl:
93
logger.debug(f"Cache entry expired for {key}")
94
return None
95
96
logger.debug(f"Cache hit for {key}")
97
return cache_data.get("value")
98
99
except Exception as e:
100
logger.warning(f"Error reading cache: {str(e)}")
101
return None
102
103
def set(self, key: str, value: Any) -> bool:
104
"""
105
Set value in cache.
106
107
Parameters
108
----------
109
key : str
110
Cache key
111
value : object
112
Value to cache (must be JSON serializable)
113
114
Returns
115
-------
116
bool
117
True if successful, False otherwise
118
"""
119
cache_path = self.get_cache_path(key)
120
121
try:
122
# Prepare cache data
123
cache_data = {"timestamp": time.time(), "value": value}
124
125
# Write to cache file
126
with open(cache_path, "w", encoding="utf-8") as f:
127
json.dump(cache_data, f, ensure_ascii=False)
128
129
logger.debug(f"Cached value for {key}")
130
return True
131
132
except Exception as e:
133
logger.warning(f"Error writing to cache: {str(e)}")
134
return False
135
136
def invalidate(self, key: str) -> bool:
137
"""
138
Invalidate cache entry.
139
140
Parameters
141
----------
142
key : str
143
Cache key
144
145
Returns
146
-------
147
bool
148
True if entry was removed, False otherwise
149
"""
150
cache_path = self.get_cache_path(key)
151
152
if cache_path.exists():
153
try:
154
os.remove(cache_path)
155
logger.debug(f"Invalidated cache for {key}")
156
return True
157
except Exception as e:
158
logger.warning(f"Error invalidating cache: {str(e)}")
159
160
return False
161
162
def clear(self, older_than: Optional[int] = None) -> int:
163
"""
164
Clear all cache entries or entries older than specified time.
165
166
Parameters
167
----------
168
older_than : int, optional
169
Clear entries older than this many seconds
170
171
Returns
172
-------
173
int
174
Number of entries cleared
175
"""
176
count = 0
177
now = time.time()
178
179
for cache_file in self.namespace_dir.glob("*.json"):
180
try:
181
# Check file age if criteria provided
182
if older_than is not None:
183
file_age = now - os.path.getmtime(cache_file)
184
if file_age <= older_than:
185
continue
186
187
# Remove file
188
os.remove(cache_file)
189
count += 1
190
191
except Exception as e:
192
logger.warning(f"Error clearing cache file {cache_file}: {str(e)}")
193
194
logger.info(f"Cleared {count} cache entries from {self.namespace}")
195
return count
196
197
def get_stats(self) -> Dict:
198
"""
199
Get cache statistics.
200
201
Returns
202
-------
203
dict
204
Cache statistics
205
"""
206
cache_files = list(self.namespace_dir.glob("*.json"))
207
total_size = sum(os.path.getsize(f) for f in cache_files)
208
209
# Analyze age distribution
210
now = time.time()
211
age_distribution = {"1h": 0, "6h": 0, "24h": 0, "older": 0}
212
213
for cache_file in cache_files:
214
file_age = now - os.path.getmtime(cache_file)
215
216
if file_age <= 3600: # 1 hour
217
age_distribution["1h"] += 1
218
elif file_age <= 21600: # 6 hours
219
age_distribution["6h"] += 1
220
elif file_age <= 86400: # 24 hours
221
age_distribution["24h"] += 1
222
else:
223
age_distribution["older"] += 1
224
225
return {
226
"namespace": self.namespace,
227
"entry_count": len(cache_files),
228
"total_size_bytes": total_size,
229
"age_distribution": age_distribution,
230
}
231

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button