Source code for cryptoolz.factory

# postpone evaluation of annotations
from __future__ import annotations

from typing import Any, Generic, TypeVar, Optional, Callable, Generator, List

from .base import AdHoc

from queue import Queue

FactoryItem = TypeVar("FactoryItem")


[docs]class Factory(Queue, Generic[FactoryItem]): constructor: Callable[..., FactoryItem] _handle_exception: Callable[[Exception], None] defaults: Optional[dict[str, Any]] = None stored: List[FactoryItem] = [] def __init__( self, constructor: Callable[..., FactoryItem], exception_handler: Callable[[Exception], None] = None, defaults: Optional[dict[str, Any]] = None, maxsize=0, ): super().__init__(maxsize) self.constructor = constructor self._handle_exception = exception_handler if defaults: self.defaults = defaults
[docs] def stream(self, some: int = 0) -> Generator[FactoryItem, None, None]: qsize: int = self.qsize() some = some or qsize predicate = min if qsize else max for _ in range(predicate(qsize, some)): yield self.create()
[docs] def create(self, store: bool = False) -> FactoryItem: result = None data_dict = self.defaults if self.empty() else self.get() args = data_dict.get("args") or [] data_dict.pop("args", None) try: result = self.constructor(*args, **data_dict) except Exception as e: if self._handle_exception: self._handle_exception(e) else: raise e if store: self.stored.append(result) return result
@AdHoc.ListMorph(0) def put( self, item: Any, block: bool = True, timeout: Optional[float] = None, ): super().put(item, block=block, timeout=timeout)