client.py 14.1 KB
Newer Older
Dmitry Volodin's avatar
Dmitry Volodin committed
1
2
3
# ----------------------------------------------------------------------
# HTTP Client
# ----------------------------------------------------------------------
4
# Copyright (C) 2007-2022 The NOC Project
Dmitry Volodin's avatar
Dmitry Volodin committed
5
6
7
8
9
# See LICENSE for details
# ----------------------------------------------------------------------

# Python modules
import socket
10
import threading
11
import ssl
12
import logging
13
import zlib
14
15
import time
import struct
Dmitry Volodin's avatar
Dmitry Volodin committed
16
import codecs
Dmitry Volodin's avatar
Dmitry Volodin committed
17
from urllib.parse import urlparse
Dmitry Volodin's avatar
Dmitry Volodin committed
18
import asyncio
19
import random
Dmitry Volodin's avatar
Dmitry Volodin committed
20

Dmitry Volodin's avatar
Dmitry Volodin committed
21
# Third-party modules
22
import cachetools
Dmitry Volodin's avatar
Dmitry Volodin committed
23
import orjson
Dmitry Volodin's avatar
Dmitry Volodin committed
24
from typing import Optional, List, Tuple, Any, Dict
Dmitry Volodin's avatar
Dmitry Volodin committed
25

Dmitry Volodin's avatar
Dmitry Volodin committed
26
27
# NOC modules
from noc.core.perf import metrics
kk's avatar
kk committed
28
from noc.core.validators import is_ipv4
29
from .proxy import SYSTEM_PROXIES
Aleksey Shirokih's avatar
Aleksey Shirokih committed
30
from noc.config import config
Dmitry Volodin's avatar
Dmitry Volodin committed
31
from noc.core.comp import smart_bytes, smart_text
Dmitry Volodin's avatar
Dmitry Volodin committed
32
33
from noc.core.ioloop.util import run_sync

Dmitry Volodin's avatar
Dmitry Volodin committed
34
from http_parser.parser import HttpParser
Aleksey Shirokih's avatar
Aleksey Shirokih committed
35

36
logger = logging.getLogger(__name__)
Dmitry Volodin's avatar
Dmitry Volodin committed
37

Aleksey Shirokih's avatar
Aleksey Shirokih committed
38
39
40
41
42
DEFAULT_CONNECT_TIMEOUT = config.http_client.connect_timeout
DEFAULT_REQUEST_TIMEOUT = config.http_client.request_timeout
DEFAULT_USER_AGENT = config.http_client.user_agent
DEFAULT_BUFFER_SIZE = config.http_client.buffer_size
DEFAULT_MAX_REDIRECTS = config.http_client.max_redirects
Dmitry Volodin's avatar
Dmitry Volodin committed
43
44
45
46
47

ERR_TIMEOUT = 599
ERR_READ_TIMEOUT = 598
ERR_PARSE_ERROR = 597

Dmitry Volodin's avatar
Dmitry Volodin committed
48
DEFAULT_PORTS = {"http": config.http_client.http_port, "https": config.http_client.https_port}
49

Dmitry Volodin's avatar
Dmitry Volodin committed
50
# Methods require Content-Length header
Dmitry Volodin's avatar
Dmitry Volodin committed
51
REQUIRE_LENGTH_METHODS = {"POST", "PUT"}
52
53

ns_lock = threading.Lock()
54
55
56
ns_cache = cachetools.TTLCache(
    config.http_client.ns_cache_size, ttl=config.http_client.resolver_ttl
)
57

58
CE_DEFLATE = "deflate"
59
CE_GZIP = "gzip"
60

61

Dmitry Volodin's avatar
Dmitry Volodin committed
62
async def resolve(host):
63
64
65
66
67
68
    """
    Resolve host and return address
    :param host:
    :return:
    """
    with ns_lock:
69
70
71
        addrs = ns_cache.get(host)
    if addrs:
        return random.choice(addrs)
72
    try:
73
74
75
76
77
78
        addr_info = await asyncio.get_running_loop().getaddrinfo(
            host, None, proto=socket.IPPROTO_TCP
        )
        addrs = [x[4][0] for x in addr_info if x[0] == socket.AF_INET]
        if not addrs:
            return None
79
        with ns_lock:
80
81
            ns_cache[host] = addrs
        return random.choice(addrs)
82
83
84
    except socket.gaierror:
        return None

Dmitry Volodin's avatar
Dmitry Volodin committed
85

Dmitry Volodin's avatar
Dmitry Volodin committed
86
async def fetch(
Dmitry Volodin's avatar
Dmitry Volodin committed
87
88
    url: str,
    method: str = "GET",
Dmitry Volodin's avatar
Dmitry Volodin committed
89
    headers=None,
Dmitry Volodin's avatar
Dmitry Volodin committed
90
    body: Optional[bytes] = None,
Dmitry Volodin's avatar
Dmitry Volodin committed
91
92
93
94
    connect_timeout=DEFAULT_CONNECT_TIMEOUT,
    request_timeout=DEFAULT_REQUEST_TIMEOUT,
    resolver=resolve,
    max_buffer_size=DEFAULT_BUFFER_SIZE,
Dmitry Volodin's avatar
Dmitry Volodin committed
95
    follow_redirects: bool = False,
Dmitry Volodin's avatar
Dmitry Volodin committed
96
97
    max_redirects=DEFAULT_MAX_REDIRECTS,
    validate_cert=config.http_client.validate_certs,
Dmitry Volodin's avatar
Dmitry Volodin committed
98
    allow_proxy: bool = False,
Dmitry Volodin's avatar
Dmitry Volodin committed
99
    proxies=None,
Dmitry Volodin's avatar
Dmitry Volodin committed
100
101
102
103
104
    user: Optional[str] = None,
    password: Optional[str] = None,
    content_encoding: Optional[str] = None,
    eof_mark: Optional[bytes] = None,
) -> Tuple[int, Dict[str, Any], bytes]:
Dmitry Volodin's avatar
Dmitry Volodin committed
105
106
    """

107
108
109
110
    :param url: Fetch URL
    :param method: request method "GET", "POST", "PUT" etc
    :param headers: Dict of additional headers
    :param body: Request body for POST and PUT request
Dmitry Volodin's avatar
Dmitry Volodin committed
111
112
    :param connect_timeout:
    :param request_timeout:
113
    :param resolver:
114
115
116
117
118
119
120
    :param follow_redirects:
    :param max_redirects:
    :param validate_cert:
    :param allow_proxy:
    :param proxies:
    :param user:
    :param password:
Dmitry Volodin's avatar
Dmitry Volodin committed
121
    :param max_buffer_size:
122
    :param content_encoding:
123
124
    :param eof_mark: Do not consider connection reset as error if
      eof_mark received (string or list)
Dmitry Volodin's avatar
Dmitry Volodin committed
125
126
    :return: code, headers, body
    """
Dmitry Volodin's avatar
Dmitry Volodin committed
127

Dmitry Volodin's avatar
Dmitry Volodin committed
128
129
130
131
132
133
134
135
136
137
138
139
    def get_connect_options():
        opts = {}
        if use_tls and not proxy:
            ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
            if validate_cert:
                ctx.check_hostname = True
                ctx.verify_mode = ssl.CERT_REQUIRED
            else:
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE
            opts["ssl"] = ctx
        return opts
140

Aleksey Shirokih's avatar
Aleksey Shirokih committed
141
    metrics["httpclient_requests", ("method", method.lower())] += 1
142
143
144
    #
    if eof_mark:
        eof_mark = smart_bytes(eof_mark)
145
    # Detect proxy when necessary
Dmitry Volodin's avatar
Dmitry Volodin committed
146
    u = urlparse(str(url))
147
    use_tls = u.scheme == "https"
Dmitry Volodin's avatar
Dmitry Volodin committed
148
149
    proto = "HTTPS" if use_tls else "HTTP"
    logger.debug("%s %s %s", proto, method, url)
Dmitry Volodin's avatar
Dmitry Volodin committed
150
151
152
153
    if ":" in u.netloc:
        host, port = u.netloc.rsplit(":")
        port = int(port)
    else:
Dmitry Volodin's avatar
Dmitry Volodin committed
154
155
156
        host = u.netloc
        port = DEFAULT_PORTS.get(u.scheme)
        if not port:
Dmitry Volodin's avatar
Dmitry Volodin committed
157
            return ERR_TIMEOUT, {}, b"Cannot resolve port for scheme: %s" % smart_bytes(u.scheme)
158
159
160
    if is_ipv4(host):
        addr = host
    else:
Dmitry Volodin's avatar
Dmitry Volodin committed
161
        addr = await resolver(host)
162
    if not addr:
163
        return ERR_TIMEOUT, {}, "Cannot resolve host: %s" % host
164
165
    # Detect proxy server
    if allow_proxy:
166
        proxy = (proxies or SYSTEM_PROXIES).get(u.scheme)
167
168
169
    else:
        proxy = None
    # Connect
Dmitry Volodin's avatar
Dmitry Volodin committed
170
171
172
173
174
175
176
    reader, writer = None, None
    if proxy:
        connect_address = proxy
    elif isinstance(addr, tuple):
        connect_address = addr
    else:
        connect_address = (addr, port)
Dmitry Volodin's avatar
Dmitry Volodin committed
177
178
    try:
        try:
179
            if proxy:
Dmitry Volodin's avatar
Dmitry Volodin committed
180
                logger.debug("Connecting to proxy %s:%s", connect_address[0], connect_address[1])
Dmitry Volodin's avatar
Dmitry Volodin committed
181
182
183
184
185
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(
                    connect_address[0], connect_address[1], **get_connect_options()
                ),
                connect_timeout,
Dmitry Volodin's avatar
Dmitry Volodin committed
186
            )
Dmitry Volodin's avatar
Dmitry Volodin committed
187
        except ConnectionRefusedError:
Dmitry Volodin's avatar
Dmitry Volodin committed
188
            metrics["httpclient_timeouts"] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
189
            return ERR_TIMEOUT, {}, b"Connection refused"
190
191
192
        except OSError as e:
            metrics["httpclient_timeouts"] += 1
            return ERR_TIMEOUT, {}, b"Connection error: %s" % smart_bytes(e)
Andrey Vertiprahov's avatar
Andrey Vertiprahov committed
193
        except asyncio.TimeoutError:
Dmitry Volodin's avatar
Dmitry Volodin committed
194
            metrics["httpclient_timeouts"] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
195
            return ERR_TIMEOUT, {}, b"Connection timed out"
196
197
        # Proxy CONNECT
        if proxy:
198
            logger.debug("Sending CONNECT %s:%s", addr, port)
199
            # Send CONNECT request
200
            req = b"CONNECT %s:%s HTTP/1.1\r\nUser-Agent: %s\r\n\r\n" % (
201
202
203
                smart_bytes(addr),
                smart_bytes(port),
                smart_bytes(DEFAULT_USER_AGENT),
204
            )
Dmitry Volodin's avatar
Dmitry Volodin committed
205
            writer.write(smart_bytes(req))
206
            try:
Dmitry Volodin's avatar
Dmitry Volodin committed
207
                await asyncio.wait_for(writer.drain(), request_timeout)
208
            except (asyncio.TimeoutError, TimeoutError):
209
                metrics["httpclient_proxy_timeouts"] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
210
                return ERR_TIMEOUT, {}, b"Timed out while sending request to proxy"
211
212
213
214
            # Wait for proxy response
            parser = HttpParser()
            while not parser.is_headers_complete():
                try:
Dmitry Volodin's avatar
Dmitry Volodin committed
215
                    data = await asyncio.wait_for(reader.read(max_buffer_size), request_timeout)
216
                except (asyncio.TimeoutError, TimeoutError):
217
                    metrics["httpclient_proxy_timeouts"] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
218
                    return ERR_TIMEOUT, {}, b"Timed out while sending request to proxy"
219
220
221
                received = len(data)
                parsed = parser.execute(data, received)
                if parsed != received:
Dmitry Volodin's avatar
Dmitry Volodin committed
222
                    return ERR_PARSE_ERROR, {}, b"Parse error"
223
            code = parser.get_status_code()
224
            logger.debug("Proxy response: %s", code)
225
            if not 200 <= code <= 299:
226
                return code, parser.get_headers(), "Proxy error: %s" % code
227
        # Process request
Dmitry Volodin's avatar
Dmitry Volodin committed
228
        body = body or ""
229
        content_type = "application/binary"
230
        if not isinstance(body, (str, bytes)):
Dmitry Volodin's avatar
Dmitry Volodin committed
231
            body = smart_text(orjson.dumps(body))
232
            content_type = "application/json"
Dmitry Volodin's avatar
Dmitry Volodin committed
233
        body = smart_bytes(body)  # Here and below body is binary
Dmitry Volodin's avatar
Dmitry Volodin committed
234
        h = {"Host": str(u.netloc), "Connection": "close", "User-Agent": DEFAULT_USER_AGENT}
235
236
237
238
239
240
241
242
243
        if body and content_encoding:
            if content_encoding == CE_DEFLATE:
                # Deflate compression
                h["Content-Encoding"] = CE_DEFLATE
                compress = zlib.compressobj(
                    zlib.Z_DEFAULT_COMPRESSION,
                    zlib.DEFLATED,
                    -zlib.MAX_WBITS,
                    zlib.DEF_MEM_LEVEL,
Dmitry Volodin's avatar
Dmitry Volodin committed
244
                    zlib.Z_DEFAULT_STRATEGY,
245
246
                )
                body = compress.compress(body) + compress.flush()
247
248
249
250
            elif content_encoding == CE_GZIP:
                # gzip compression
                h["Content-Encoding"] = CE_GZIP
                compress = zlib.compressobj(
Dmitry Volodin's avatar
Dmitry Volodin committed
251
                    6, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0
252
                )
Dmitry Volodin's avatar
Dmitry Volodin committed
253
                crc = zlib.crc32(body, 0) & 0xFFFFFFFF
Dmitry Volodin's avatar
Dmitry Volodin committed
254
                body = b"\x1f\x8b\x08\x00%s\x02\xff%s%s%s%s" % (
255
256
257
258
                    to32u(int(time.time())),
                    compress.compress(body),
                    compress.flush(),
                    to32u(crc),
Dmitry Volodin's avatar
Dmitry Volodin committed
259
                    to32u(len(body)),
260
                )
Dmitry Volodin's avatar
Dmitry Volodin committed
261
        if method in REQUIRE_LENGTH_METHODS:
262
            h["Content-Length"] = str(len(body))
263
            h["Content-Type"] = content_type
264
265
        if user and password:
            # Include basic auth header
Dmitry Volodin's avatar
Dmitry Volodin committed
266
267
            uh = smart_text("%s:%s" % (user, password))
            h["Authorization"] = b"Basic %s" % codecs.encode(uh.encode("utf-8"), "base64").strip()
Dmitry Volodin's avatar
Dmitry Volodin committed
268
        if headers:
269
            h.update(headers)
270
271
272
        path = u.path
        if u.query:
            path += "?%s" % u.query
273
        req = b"%s %s HTTP/1.1\r\n%s\r\n\r\n%s" % (
Dmitry Volodin's avatar
Dmitry Volodin committed
274
275
276
            smart_bytes(method),
            smart_bytes(path),
            b"\r\n".join(b"%s: %s" % (smart_bytes(k), smart_bytes(h[k])) for k in h),
Dmitry Volodin's avatar
Dmitry Volodin committed
277
            body,
278
        )
Dmitry Volodin's avatar
Dmitry Volodin committed
279
        try:
Dmitry Volodin's avatar
Dmitry Volodin committed
280
281
282
            writer.write(req)
            await asyncio.wait_for(writer.drain(), request_timeout)
        except ConnectionResetError:
Dmitry Volodin's avatar
Dmitry Volodin committed
283
            metrics["httpclient_timeouts"] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
284
            return ERR_TIMEOUT, {}, b"Connection reset while sending request"
285
        except (asyncio.TimeoutError, TimeoutError):
Dmitry Volodin's avatar
Dmitry Volodin committed
286
            metrics["httpclient_timeouts"] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
287
            return ERR_TIMEOUT, {}, b"Timed out while sending request"
Dmitry Volodin's avatar
Dmitry Volodin committed
288
        parser = HttpParser()
Dmitry Volodin's avatar
Dmitry Volodin committed
289
        response_body: List[bytes] = []
Dmitry Volodin's avatar
Dmitry Volodin committed
290
291
        while not parser.is_message_complete():
            try:
Dmitry Volodin's avatar
Dmitry Volodin committed
292
                data = await asyncio.wait_for(reader.read(max_buffer_size), request_timeout)
Dmitry Volodin's avatar
Dmitry Volodin committed
293
                is_eof = not data
294
            except (asyncio.IncompleteReadError, ConnectionResetError):
Dmitry Volodin's avatar
Dmitry Volodin committed
295
                is_eof = True
296
            except (asyncio.TimeoutError, TimeoutError):
Dmitry Volodin's avatar
Dmitry Volodin committed
297
298
299
                metrics["httpclient_timeouts"] += 1
                return ERR_READ_TIMEOUT, {}, b"Request timed out"
            if is_eof:
300
                if eof_mark and response_body:
301
                    # Check if EOF mark is in received data
302
                    response_body = [b"".join(response_body)]
303
                    if isinstance(eof_mark, str):
304
305
306
307
308
309
310
311
312
313
                        if eof_mark in response_body[0]:
                            break
                    else:
                        found = False
                        for m in eof_mark:
                            if m in response_body[0]:
                                found = True
                                break
                        if found:
                            break
Dmitry Volodin's avatar
Dmitry Volodin committed
314
                metrics["httpclient_timeouts"] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
315
                return ERR_READ_TIMEOUT, {}, b"Connection reset"
Dmitry Volodin's avatar
Dmitry Volodin committed
316
317
318
            received = len(data)
            parsed = parser.execute(data, received)
            if parsed != received:
Dmitry Volodin's avatar
Dmitry Volodin committed
319
                return ERR_PARSE_ERROR, {}, b"Parse error"
Dmitry Volodin's avatar
Dmitry Volodin committed
320
321
            if parser.is_partial_body():
                response_body += [parser.recv_body()]
322
323
        code = parser.get_status_code()
        parsed_headers = parser.get_headers()
324
        logger.debug("HTTP Response %s", code)
325
        if 300 <= code <= 399 and follow_redirects:
326
            # Process redirects
327
328
329
            if max_redirects > 0:
                new_url = parsed_headers.get("Location")
                if not new_url:
Dmitry Volodin's avatar
Dmitry Volodin committed
330
                    return ERR_PARSE_ERROR, {}, b"No Location header"
331
                logger.debug("HTTP redirect %s %s", code, new_url)
Dmitry Volodin's avatar
Dmitry Volodin committed
332
                return await fetch(
333
                    new_url,
Dmitry Volodin's avatar
Dmitry Volodin committed
334
335
                    method="GET",
                    headers=headers,
336
337
338
339
340
341
                    connect_timeout=connect_timeout,
                    request_timeout=request_timeout,
                    resolver=resolver,
                    max_buffer_size=max_buffer_size,
                    follow_redirects=follow_redirects,
                    max_redirects=max_redirects - 1,
342
343
                    validate_cert=validate_cert,
                    allow_proxy=allow_proxy,
Dmitry Volodin's avatar
Dmitry Volodin committed
344
                    proxies=proxies,
345
346
                )
            else:
Dmitry Volodin's avatar
Dmitry Volodin committed
347
                return 404, {}, b"Redirect limit exceeded"
348
        # @todo: Process gzip and deflate Content-Encoding
349
        return code, parsed_headers, b"".join(response_body)
Dmitry Volodin's avatar
Dmitry Volodin committed
350
    finally:
Dmitry Volodin's avatar
Dmitry Volodin committed
351
352
        if writer:
            writer.close()
353
354
355
356
            try:
                await writer.wait_closed()
            except ConnectionResetError:
                pass
357
358


Dmitry Volodin's avatar
Dmitry Volodin committed
359
def fetch_sync(
Dmitry Volodin's avatar
Dmitry Volodin committed
360
361
    url: str,
    method: str = "GET",
Dmitry Volodin's avatar
Dmitry Volodin committed
362
    headers=None,
Dmitry Volodin's avatar
Dmitry Volodin committed
363
    body: Optional[bytes] = None,
Dmitry Volodin's avatar
Dmitry Volodin committed
364
365
366
367
368
369
370
    connect_timeout=DEFAULT_CONNECT_TIMEOUT,
    request_timeout=DEFAULT_REQUEST_TIMEOUT,
    resolver=resolve,
    max_buffer_size=DEFAULT_BUFFER_SIZE,
    follow_redirects=False,
    max_redirects=DEFAULT_MAX_REDIRECTS,
    validate_cert=config.http_client.validate_certs,
Dmitry Volodin's avatar
Dmitry Volodin committed
371
    allow_proxy: bool = False,
Dmitry Volodin's avatar
Dmitry Volodin committed
372
    proxies=None,
Dmitry Volodin's avatar
Dmitry Volodin committed
373
374
375
376
    user: Optional[str] = None,
    password: Optional[str] = None,
    content_encoding: Optional[str] = None,
    eof_mark: Optional[bytes] = None,
Dmitry Volodin's avatar
Dmitry Volodin committed
377
):
Dmitry Volodin's avatar
Dmitry Volodin committed
378
379
    async def _fetch():
        return await fetch(
380
            url,
Dmitry Volodin's avatar
Dmitry Volodin committed
381
382
383
            method=method,
            headers=headers,
            body=body,
384
385
386
            connect_timeout=connect_timeout,
            request_timeout=request_timeout,
            resolver=resolver,
387
388
389
            max_buffer_size=max_buffer_size,
            follow_redirects=follow_redirects,
            max_redirects=max_redirects,
390
391
            validate_cert=validate_cert,
            allow_proxy=allow_proxy,
392
393
            proxies=proxies,
            user=user,
394
            password=password,
395
            content_encoding=content_encoding,
Dmitry Volodin's avatar
Dmitry Volodin committed
396
            eof_mark=eof_mark,
397
398
        )

Dmitry Volodin's avatar
Dmitry Volodin committed
399
    return run_sync(_fetch)
400
401
402


def to32u(n):
403
    return struct.pack("<L", n)