diff --git a/core/liftbridge/base.py b/core/liftbridge/base.py index 652e7004d1782ecd1a66aeee7aa3f8a5896b8a44..c7e5d8094ac2bcc9c3d0709c2c1ae8adc3333835 100644 --- a/core/liftbridge/base.py +++ b/core/liftbridge/base.py @@ -153,7 +153,6 @@ class GRPCChannel(object): """ Wait until channel became ready or raise ErrorUnavailable - :param channel: :return: """ while True: @@ -236,7 +235,8 @@ class LiftBridgeClient(object): self.open_brokers = list(self.channels) return channel - async def _sleep_on_error(self, delay: float = 1.0, deviation: float = 1.0): + @staticmethod + async def _sleep_on_error(delay: float = 1.0, deviation: float = 1.0): """ Wait random time on error :return: @@ -340,7 +340,14 @@ class LiftBridgeClient(object): req.streams.append(stream) while True: channel = await self.get_channel() - r = await channel.FetchMetadata(req) + try: + r: FetchMetadataResponse = await channel.FetchMetadata(req) + except AioRpcError as e: + logger.info("Failed to get metadata: %s", e) + if e.code() in self.GRPC_RESTARTABLE_CODES: + await self._sleep_on_error(delay=10) + continue + raise e if not r.metadata and wait_for_stream: await asyncio.sleep(1) continue