PYTHON_JOB_INTERVIEW Telegram 1150
📌 Сложная задача для собеседования на Python: «Асинхронный кэш с TTL и инвалидацией».

Условие:
Реализуйте потокобезопасный асинхронный кэш с временем жизни записей, поддержкой инвалидации по ключу и автоматическим удалением устаревших записей. Кэш должен:
1️⃣ Хранить значения не дольше указанного TTL (seconds)
2️⃣ Автоматически очищать устаревшие записи без блокировки основного потока
3️⃣ Поддерживать асинхронные операции get/set
4️⃣ Иметь механизм ручной инвалидации
5️⃣ Гарантировать потокобезопасность
6️⃣ Оптимизировать память

Ожидаемое решение:
import asyncio
import time
from collections import OrderedDict
from typing import Any, Optional
import threading

class AsyncTTLCache:
def __init__(self, maxsize: int = 1024, ttl: int = 60):
self._cache = OrderedDict()
self._maxsize = maxsize
self._ttl = ttl
self._lock = threading.Lock()
self._cleanup_task = asyncio.create_task(self._cleanup_expired())

async def get(self, key: str) -> Optional[Any]:
with self._lock:
if key not in self._cache:
return None
value, expiry = self._cache[key]
if time.time() > expiry:
del self._cache[key]
return None
# Move to end to mark as recently used
self._cache.move_to_end(key)
return value

async def set(self, key: str, value: Any) -> None:
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, time.time() + self._ttl)
if len(self._cache) > self._maxsize:
self._cache.popitem(last=False)

async def invalidate(self, key: str) -> None:
with self._lock:
if key in self._cache:
del self._cache[key]

async def _cleanup_expired(self) -> None:
while True:
await asyncio.sleep(self._ttl)
with self._lock:
now = time.time()
expired_keys = [
k for k, (_, expiry) in self._cache.items()
if expiry <= now
]
for k in expired_keys:
del self._cache[k]

def __del__(self):
self._cleanup_task.cancel()


# Пример использования
async def main():
cache = AsyncTTLCache(ttl=2)

await cache.set("a", 1)
print(await cache.get("a")) # 1

await asyncio.sleep(3)
print(await cache.get("a")) # None

await cache.set("b", 2)
await cache.invalidate("b")
print(await cache.get("b")) # None

asyncio.run(main())

Предлагайте свои варианты решения в комментариях⏬️

@python_job_interview
🔥176👍5😱3



tgoop.com/python_job_interview/1150
Create:
Last Update:

📌 Сложная задача для собеседования на Python: «Асинхронный кэш с TTL и инвалидацией».

Условие:
Реализуйте потокобезопасный асинхронный кэш с временем жизни записей, поддержкой инвалидации по ключу и автоматическим удалением устаревших записей. Кэш должен:
1️⃣ Хранить значения не дольше указанного TTL (seconds)
2️⃣ Автоматически очищать устаревшие записи без блокировки основного потока
3️⃣ Поддерживать асинхронные операции get/set
4️⃣ Иметь механизм ручной инвалидации
5️⃣ Гарантировать потокобезопасность
6️⃣ Оптимизировать память

Ожидаемое решение:

import asyncio
import time
from collections import OrderedDict
from typing import Any, Optional
import threading

class AsyncTTLCache:
def __init__(self, maxsize: int = 1024, ttl: int = 60):
self._cache = OrderedDict()
self._maxsize = maxsize
self._ttl = ttl
self._lock = threading.Lock()
self._cleanup_task = asyncio.create_task(self._cleanup_expired())

async def get(self, key: str) -> Optional[Any]:
with self._lock:
if key not in self._cache:
return None
value, expiry = self._cache[key]
if time.time() > expiry:
del self._cache[key]
return None
# Move to end to mark as recently used
self._cache.move_to_end(key)
return value

async def set(self, key: str, value: Any) -> None:
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, time.time() + self._ttl)
if len(self._cache) > self._maxsize:
self._cache.popitem(last=False)

async def invalidate(self, key: str) -> None:
with self._lock:
if key in self._cache:
del self._cache[key]

async def _cleanup_expired(self) -> None:
while True:
await asyncio.sleep(self._ttl)
with self._lock:
now = time.time()
expired_keys = [
k for k, (_, expiry) in self._cache.items()
if expiry <= now
]
for k in expired_keys:
del self._cache[k]

def __del__(self):
self._cleanup_task.cancel()


# Пример использования
async def main():
cache = AsyncTTLCache(ttl=2)

await cache.set("a", 1)
print(await cache.get("a")) # 1

await asyncio.sleep(3)
print(await cache.get("a")) # None

await cache.set("b", 2)
await cache.invalidate("b")
print(await cache.get("b")) # None

asyncio.run(main())

Предлагайте свои варианты решения в комментариях⏬️

@python_job_interview

BY Python вопросы с собеседований


Share with your friend now:
tgoop.com/python_job_interview/1150

View MORE
Open in Telegram


Telegram News

Date: |

4How to customize a Telegram channel? How to create a business channel on Telegram? (Tutorial) Just at this time, Bitcoin and the broader crypto market have dropped to new 2022 lows. The Bitcoin price has tanked 10 percent dropping to $20,000. On the other hand, the altcoin space is witnessing even more brutal correction. Bitcoin has dropped nearly 60 percent year-to-date and more than 70 percent since its all-time high in November 2021. Telegram message that reads: "Bear Market Screaming Therapy Group. You are only allowed to send screaming voice notes. Everything else = BAN. Text pics, videos, stickers, gif = BAN. Anything other than screaming = BAN. You think you are smart = BAN. Click “Save” ;
from us


Telegram Python вопросы с собеседований
FROM American