From d27ab2f42e15d4418c2b3db69a23f44395ddfd37 Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Wed, 9 Mar 2022 09:42:14 +0500 Subject: [PATCH] Catch LiftBridge cluster error when request metadata. --- core/liftbridge/base.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/core/liftbridge/base.py b/core/liftbridge/base.py index 652e7004d1..c7e5d8094a 100644 --- a/core/liftbridge/base.py +++ b/core/liftbridge/base.py @@ -153,7 +153,6 @@ class GRPCChannel(object): """ Wait until channel became ready or raise ErrorUnavailable - :param channel: :return: """ while True: @@ -236,7 +235,8 @@ class LiftBridgeClient(object): self.open_brokers = list(self.channels) return channel - async def _sleep_on_error(self, delay: float = 1.0, deviation: float = 1.0): + @staticmethod + async def _sleep_on_error(delay: float = 1.0, deviation: float = 1.0): """ Wait random time on error :return: @@ -340,7 +340,14 @@ class LiftBridgeClient(object): req.streams.append(stream) while True: channel = await self.get_channel() - r = await channel.FetchMetadata(req) + try: + r: FetchMetadataResponse = await channel.FetchMetadata(req) + except AioRpcError as e: + logger.info("Failed to get metadata: %s", e) + if e.code() in self.GRPC_RESTARTABLE_CODES: + await self._sleep_on_error(delay=10) + continue + raise e if not r.metadata and wait_for_stream: await asyncio.sleep(1) continue -- GitLab