Enable parsing HAProxy's Proxy protocol for TCP Servers
Most of the time, tornado runs an HTTP(S) server or similar, and can run behind something like nginx that can supply X-* headers to get the client's real IP address.
However, Tornado is capable of running non-HTTP services over TCP; in these cases, some load balancers support passing the client IP information via the HAProxy Proxy Protocol, as described here: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
It would be handy to have the parsing of the proxy protocol to be built into Tornado, so that the client's real IP address can be acquired in the non-HTTP settings.
It would also be useful, specifically in the HTTP case, to autodetect the proxy protocol (if configured to do so, of course) and update everything accordingly so yet another nginx server or similar isn't required.
I have tentative code that works for this, but it assumes that a few hacks used when interacting with Tornado actually work:
"""
proxy_protocol.py
Implements the HAProxy Proxy Protocol that passes the actual client
IP behind a load balancer to the server.
Note that the spec is defined here:
https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
"""
from tornado import gen, httpserver, httputil, iostream, tcpserver
# Set the maximum number of bytes that should be allowed in the PROXY
# protocol; this is specifically designed so the proxy line fits in a
# single TCP packet, if I remember correctly.
MAX_PROXY_PROTO_BYTES = 108
class ProxyProtocolInvalidAddress(Exception):
""" Indicates that the proxy type was invalid.
The proxy protocol requires fields like: TCP4 or TCP6 (for version 1)
"""
pass
class ProxyProtocolInvalidLine(Exception):
""" Indicates that the proxy line as a whole was invalid. """
pass
class ProxyProtocolIncompleteLine(Exception):
""" Indicates that the proxy line is incomplete. """
pass
def parse_proxy_line(line):
"""
Parses the given line (string or sequence of bytes) for the client IP
and other fields passed through the proxy protocol.
This returns a tuple with elements as follows:
(1) Dictionary with the parsed IP addresses
(2) Index where the proxy protocol stopped parsing.
This throws exceptions if the proxy protocol could not be parsed.
"""
line_len = len(line)
if not line.startswith("PROXY "):
for i, ch in enumerate("PROXY "):
if i >= line_len:
raise ProxyProtocolIncompleteLine()
if line[i] != ch:
break
raise ProxyProtocolInvalidLine()
idx = line.find('\r\n')
if idx < 0:
if line_len >= MAX_PROXY_PROTO_BYTES:
raise ProxyProtocolInvalidLine()
raise ProxyProtocolIncompleteLine()
# Now, try and parse the proxy line as desired.
proxy_line = line[:idx]
fields = proxy_line.split(' ')
if len(fields) != 6:
raise ProxyProtocolInvalidLine()
# Check if the proxy line is for v1 of the protocol.
if fields[0] == "PROXY":
try:
if fields[1] == "TCP4" or fields[1] == "TCP6":
source_addr = fields[2]
dest_addr = fields[3]
source_port = int(fields[4])
dest_port = int(fields[5])
else:
raise ProxyProtocolInvalidAddress()
except:
raise ProxyProtocolInvalidAddress()
# NOTE: This is a very subtle, but VERY IMPORTANT.
# We MUST return the index that we parsed the line at, in the
# event that we did not parse the whole line! This is needed so
# that any callers of ours can reuse any unused bytes that weren't
# parsed as a part of the proxy protocol.
#
# In our current implementation, that is *almost* stored in the 'idx'
# parameter, except that we need to add 2 for the CRLF characters at
# the end that we parse out.
return {
'client_ip': source_addr,
'client_port': source_port,
'local_ip': dest_addr,
'local_port': dest_port
}, int(idx + 2)
raise ProxyProtocolInvalidLine()
def putback_line_in_stream(line, stream):
"""
HACKY: This call reinjects the given line back into the stream.
@bdarnell, can you provide a cleaner way to do this?
This emulates a "putback" or "unget" operation. This is subject to
break, though, so it should be used with caution, since it uses
tornado internals.
"""
# line_len = len(line)
# stream._read_buffer_pos -= line_len
# stream._read_buffer_size += line_len
if not line:
return
stream._read_buffer[:0] = line
stream._read_buffer_size += len(line)
@gen.coroutine
def parse_proxy_line_async(stream):
line = ''
bytes_remaining = MAX_PROXY_PROTO_BYTES
addr = None
idx = 0
# This "while" loop is necessary because Tornado does not (but it should!)
# support an option to `stream.read_until()` to not automatically close the
# connection if the maximum byte count is reached before finding the
# delimiter.
#
# Again, @bdarnell, can you provide a cleaner way to do this?
#
while True:
# Keep reading until we get the maximum number of bytes allowed.
# We will break out of this loop early if we identify that the
# parsed line doesn't imply the proxy protocol.
val = yield stream.read_bytes(bytes_remaining, partial=True)
line += val
bytes_remaining = MAX_PROXY_PROTO_BYTES - len(line)
try:
addr, idx = parse_proxy_line(line)
break
except ProxyProtocolIncompleteLine:
# If the line is incomplete, keep trying to read.
continue
except (ProxyProtocolInvalidLine, ProxyProtocolInvalidAddress):
idx = 0
break
# Reinsert any unparsed bytes back into the stream.
putback_line_in_stream(line[idx:], stream)
# Return the address we parsed.
# This supports older tornado versions, though this can be very
# easily fixed to work with asyncio.
raise gen.Return(addr)
class ProxyProtocolTCPServer(tcpserver.TCPServer):
"""
Wrapper for tornado.tcpserver.TCPServer that parses out the
HAProxy PROXY protocol and attaches the actual IP to the stream
as an attribute named 'proxy_addr'.
The proxy protocol spec is defined here:
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
"""
@gen.coroutine
def handle_stream(self, stream, address):
try:
# Try and parse out the proxy line.
# If this call fails, then the bytes that were read on the stream
# will be re-added and addr will be 'None'.
addr = yield parse_proxy_line_async(stream)
if addr is not None:
stream.proxy_addr = addr
# Use our parsed address here.
super(ProxyProtocolTCPServer, self).handle_stream(stream, address)
except iostream.StreamClosedError:
pass
except:
super(ProxyProtocolTCPServer, self).handle_stream(stream, address)
class ProxyProtocolAdapter(httputil.HTTPMessageDelegate, object):
"""
Implements a HTTPMessageDelegate that injects the real client IP into the
request context for each request.
Basically, this call makes sure that the following code actually returns
the correct client IP:
`
class SampleHandler(tornado.web.RequestHandler):
def post(self):
self.request.remote_ip
`
This works by intercepting the delegate that is called whenever a request
is made, then injecting the proper remote_ip (and other relevant fields).
When the request is finished, or stuff falls out of scope, this "undoes"
the injected changes so everything is consistent.
Note that this particular adapter only works for HTTP (and by extension
WebSocket) calls.
"""
def __init__(self, delegate, request_conn, proxy_ip=None):
self.connection = request_conn
self.delegate = delegate
self.proxy_ip = proxy_ip
def _apply_proxy_info(self):
self._orig_ip = self.connection.context.remote_ip
if self.proxy_ip:
self.connection.context.remote_ip = self.proxy_ip
def _undo_proxy_info(self):
self.connection.context.remote_ip = self._orig_ip
def headers_received(self, start_line, headers):
self._apply_proxy_info()
return self.delegate.headers_received(start_line, headers)
def data_received(self, chunk):
return self.delegate.data_received(chunk)
def finish(self):
self.delegate.finish()
self._undo_proxy_info()
def on_connection_close(self):
self.delegate.on_connection_close()
self._undo_proxy_info()
class ProxyProtocolHTTPServer(httpserver.HTTPServer):
"""
Wrapper for tornado.httpserver.HTTPServer that parses out the
HAProxy PROXY protocol and sets the appropriate remote_ip as
defined here:
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
"""
@gen.coroutine
def handle_stream(self, stream, address):
"""
Creates the stream for this connection.
This parses out the proxy protocol from the request and injects the
relevant fields onto the given stream object at: `stream.proxy_addr`
"""
try:
addr = yield parse_proxy_line_async(stream)
if addr is not None:
stream.proxy_addr = addr
# Use our parsed address here.
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
except iostream.StreamClosedError:
pass
except:
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
def start_request(self, server_conn, request_conn):
"""
Override so that we can inject the real ip for `self.request.remote_ip`
in each request.
"""
delegate = super(ProxyProtocolHTTPServer, self).start_request(server_conn, request_conn)
# If the real client IP was parsed via the proxy protocol, the address would
# have been attached to the socket in the 'proxy_addr' attribute, so we try
# to fetch it. If we couldn't, just ignore it and default to the usual
# behavior. (Should we log it instead?)
try:
proxy_addr = server_conn.stream.proxy_addr
addr = proxy_addr['client_ip']
return ProxyProtocolAdapter(delegate, request_conn, proxy_ip=addr)
except Exception as e:
return delegate
This code supports parsing PROXY protocol (version 1), though it could be tailored for version 2. (Also, error-checking could probably be done on the IP addresses and ports to make sure that they are actually valid IPs.)
@bdarnell , assuming you provide a clean way to put bytes back into the stream as documented in the code comments, (and maybe also assuming you provide a slightly cleaner way to handle the max bytes issue without blindly closing the connection), this code should transparently parse out HTTP connections behind a load balancer which may or may not be enabled with the proxy protocol (v1). I've even tested this by updating some load balancer with and without the proxy protocol without ever shutting down the underlying server.
This makes migrations for HTTP (and potentially other TCP) servers substantially easier without complicating a whole bunch of infrastructure to put the proxy protocol in a different port. It CAN be done, and tornado should at least support some framework calls to make it possible, even if you don't want to implement it directly in tornado itself.
Hello, recently I worked with Tornado and HAProxy, so here is my solusion for Proxy protocol version 2(ppv2)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
proxy_protocol.py
Implements the HAProxy Proxy Protocol that passes the actual client
IP behind a load balancer to the server.
Note that the spec is defined here:
https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
"""
from tornado import httputil
from tornado import iostream
from tornado import httpserver
from typing import Dict
from typing import Tuple
from typing import Union
from typing import Optional
from typing import Awaitable
from ipaddress import IPv4Address
from ipaddress import IPv6Address
# Proxy protocol v1
# Set the maximum number of bytes that should be allowed in the PROXY
# protocol; this is specifically designed so the proxy line fits in a
# single TCP packet.
MAX_PROXY_PROTO_BYTES = 108
PROXY_PROTO_DELIMITER = "\r\n"
# Proxy protocol v2
PROXY_PROTO_V2_SIGNATURE = b"\r\n\r\n\x00\r\nQUIT\n"
PROXY_PROTO_V2_HEADER_LENGTH = 16
# Protocol Version and Command
# Command
LOCAL = 0
PROXY = 1
PVC = {32: ("OK", LOCAL), # 00100000 \x20
33: ("OK", PROXY)} # 00100001 \x21
# Address Family and Transport_Protocol
# Address Family
AF_UNSPEC = 0
AF_INET = 1
AF_INET6 = 2
AF_UNIX = 3
# Transport_Protocol
UNSPEC = 0
STREAM = 1
DGRAM = 2
AFTP = {0: (AF_UNSPEC, UNSPEC), # 00000000 \x00
1: (AF_UNSPEC, STREAM), # 00000001 \x01
2: (AF_UNSPEC, DGRAM), # 00000010 \x02
16: (AF_INET, UNSPEC), # 00010000 \x10
17: (AF_INET, STREAM), # 00010001 \x11
18: (AF_INET, DGRAM), # 00010010 \x12
32: (AF_INET6, UNSPEC), # 00100000 \x20
33: (AF_INET6, STREAM), # 00100001 \x21
34: (AF_INET6, DGRAM), # 00100010 \x22
48: (AF_UNIX, UNSPEC), # 00110000 \x30
49: (AF_UNIX, STREAM), # 00110001 \x31
50: (AF_UNIX, DGRAM)} # 00110010 \x32
class ProxyProtocolInvalidAddress(Exception):
"""
Indicates that the proxy type was invalid.
The proxy protocol requires fields like: TCP4 or TCP6 (for version 1)
"""
pass
class ProxyProtocolInvalidLine(Exception):
"""Indicates that the proxy line as a whole was invalid."""
def __init__(self, line: str = "") -> None:
super(ProxyProtocolInvalidLine, self).__init__()
self.line = line
class ProxyProtocolSocketIsNotSupported(Exception):
"""Indicates that we do not support connection to proxy via unix socket."""
pass
def parse_proxy_line(line: bytes) -> Dict:
"""
Parses the given line (string or sequence of bytes) for the client IP
and other fields passed through the proxy protocol.
This returns dictionary with the parsed IP addresses
This throws exceptions if the proxy protocol could not be parsed.
"""
line = line.decode("utf-8")
# Remove '\r\n' delimiter and parse proxy protocol line.
fields = line[:-2].split(' ')
if len(fields) != 6:
raise ProxyProtocolInvalidLine(line)
# Check if the proxy line is for v1 of the protocol.
if fields[0] == "PROXY":
try:
if fields[1] == "TCP4" or fields[1] == "TCP6":
source_addr = fields[2]
dest_addr = fields[3]
source_port = int(fields[4])
dest_port = int(fields[5])
else:
raise ProxyProtocolInvalidAddress()
except Exception:
raise ProxyProtocolInvalidAddress()
return {'client_ip': source_addr,
'client_port': source_port,
'local_ip': dest_addr,
'local_port': dest_port}
raise ProxyProtocolInvalidLine(line)
def parse_proxy_v2_header(line: bytes) -> Dict:
"""
Parses the given line (string or sequence of bytes) for the client IP
and other fields passed through the proxy protocol.
This returns dictionary with the parsed IP addresses
This throws exceptions if the proxy protocol could not be parsed.
"""
# First 12 bytes contains proxy protocol signature
if line[:12] != PROXY_PROTO_V2_SIGNATURE:
raise ProxyProtocolInvalidLine(line)
# The 13th byte contains proxy version and command(pvc)
# The highest four bits contains the version. As of this specification,
# it must always be sent as \x2 and the receiver must only accept this
# value. The lowest four bits represents the command
if line[12] in PVC:
_, command = PVC[line[12]]
else:
raise ProxyProtocolInvalidLine(line)
# The 14th byte contains transport protocol and address family.
if line[13] in AFTP:
af, tp = AFTP[line[13]]
else:
raise ProxyProtocolInvalidLine(line)
# The 15th and 16th bytes is the address length in bytes in network
# endian order
length = line[15] | line[14] << 8
return {"command": command,
"address_family": af,
"transport_protocol": tp,
"length": length}
def parse_proxy_v2_addr(header: Dict, line: bytes) -> Dict:
"""
Parse address part of stream into individual ips/ports
"""
if header["address_family"] == AF_UNSPEC and \
header["transport_protocol"] == UNSPEC:
if header["command"] == LOCAL:
# In this case we are ignoring rest of the bytes
return {'client_ip': "unknown",
'client_port': 0,
'local_ip': "unknown",
'local_port': 0}
else:
raise ProxyProtocolInvalidLine()
elif header["address_family"] == AF_UNSPEC or \
header["transport_protocol"] == UNSPEC:
raise ProxyProtocolInvalidLine()
if header["address_family"] == AF_INET and \
header["transport_protocol"] == STREAM:
# TCP connection over ipv4 / Address length is 2*4+2*2=12 bytes.
result = {"client_ip": str(IPv4Address(line[0:4])),
"local_ip": str(IPv4Address(line[4:8])),
"client_port": line[9] | line[8] << 8,
"local_port": line[11] | line[10] << 8}
elif header["address_family"] == AF_INET and \
header["transport_protocol"] == DGRAM:
# UDP connection over ipv4 / Address length is 2*4+2*2=12 bytes.
result = {"client_ip": str(IPv4Address(line[0:4])),
"local_ip": str(IPv4Address(line[4:8])),
"client_port": line[9] | line[8] << 8,
"local_port": line[11] | line[10] << 8}
elif header["address_family"] == AF_INET6 and \
header["transport_protocol"] == STREAM:
# TCP connection over ipv6 / Address length is 2*16+2*2=36 bytes.
result = {"client_ip": str(IPv6Address(line[0:16])),
"local_ip": str(IPv6Address(line[16:32])),
"client_port": line[33] | line[32] << 8,
"local_port": line[35] | line[34] << 8}
elif header["address_family"] == AF_INET6 and \
header["transport_protocol"] == DGRAM:
# UDP connection over ipv6 / Address length is 2*16+2*2=36 bytes.
result = {"client_ip": str(IPv6Address(line[0:16])),
"local_ip": str(IPv6Address(line[16:32])),
"client_port": line[33] | line[32] << 8,
"local_port": line[35] | line[34] << 8}
elif header["address_family"] == AF_UNIX and \
header["transport_protocol"] == STREAM:
# TCP connection over UNIX_STREAM / Address length is 2*108=216 bytes.
# This variant is not supported yet
# result = {"client_ip": line[0:108],
# "local_ip": line[108:216]}
raise ProxyProtocolSocketIsNotSupported()
elif header["address_family"] == AF_UNIX and \
header["transport_protocol"] == DGRAM:
# UDP connection over UNIX_STREAM / Address length is 2*108=216 bytes.
# This variant is not supported yet
# result = {"client_ip": line[0:108],
# "local_ip": line[108:216]}
raise ProxyProtocolSocketIsNotSupported()
else:
raise ProxyProtocolInvalidLine(line)
return result
class ProxyProtocolAdapter(httputil.HTTPMessageDelegate):
"""
Implements a HTTPMessageDelegate that injects the real client IP into the
request context for each request.
Basically, this call makes sure that the following code actually returns
the correct client IP:
`
class SampleHandler(tornado.web.RequestHandler):
def post(self):
self.request.remote_ip
`
This works by intercepting the delegate that is called whenever a request
is made, then injecting the proper remote_ip (and other relevant fields).
When the request is finished, or stuff falls out of scope, this "undoes"
the injected changes so everything is consistent.
Note that this particular adapter only works for HTTP (and by extension
WebSocket) calls.
"""
def __init__(self, delegate: httputil.HTTPMessageDelegate,
request_conn: httputil.HTTPConnection,
proxy_ip: str) -> None:
self.connection = request_conn
self.delegate = delegate
self.proxy_ip = proxy_ip
def _apply_proxy_info(self) -> None:
self._orig_ip = self.connection.context.remote_ip
if self.proxy_ip:
self.connection.context.remote_ip = self.proxy_ip
def _undo_proxy_info(self) -> None:
self.connection.context.remote_ip = self._orig_ip
def headers_received(self, start_line: Union[httputil.RequestStartLine,
httputil.ResponseStartLine],
headers: httputil.HTTPHeaders
) -> Optional[Awaitable[None]]:
self._apply_proxy_info()
return self.delegate.headers_received(start_line, headers)
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
return self.delegate.data_received(chunk)
def finish(self) -> None:
self.delegate.finish()
self._undo_proxy_info()
def on_connection_close(self) -> None:
self.delegate.on_connection_close()
self._undo_proxy_info()
class ProxyProtocolHTTPServer(httpserver.HTTPServer):
"""
Wrapper for tornado.httpserver.HTTPServer that parses out the
HAProxy PROXY protocol and sets the appropriate remote_ip as
defined here:
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
"""
async def handle_stream(self, stream: iostream.IOStream,
address: Tuple) -> None:
"""
Creates the stream for this connection.
This parses out the proxy protocol from the request and injects the
relevant fields onto the given stream object at: `stream.proxy_addr`
"""
try:
line = b''
addr = None
# Proxy protocol part ends with \n\r delimiter and
# should not be longer then 108 bytes
line = await stream.read_until_regex(
regex=PROXY_PROTO_DELIMITER.encode("utf-8"),
max_bytes=MAX_PROXY_PROTO_BYTES)
addr = parse_proxy_line(line)
if addr is not None:
stream.proxy_addr = addr
# Use our parsed address here.
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
except iostream.StreamClosedError:
pass
except Exception:
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
def start_request(self, server_conn: object,
request_conn: httputil.HTTPConnection
) -> httputil.HTTPMessageDelegate:
"""
Override so that we can inject the real ip for `self.request.remote_ip`
in each request.
"""
delegate = super(ProxyProtocolHTTPServer,
self).start_request(server_conn, request_conn)
# If the real client IP was parsed via the proxy protocol, the address
# would have been attached to the socket in the 'proxy_addr' attribute,
# so we try to fetch it. If we couldn't, just ignore it and default to
# the usual behavior. (Should we log it instead?)
try:
proxy_addr = server_conn.stream.proxy_addr
addr = proxy_addr['client_ip']
return ProxyProtocolAdapter(delegate, request_conn, proxy_ip=addr)
except Exception:
return delegate
class ProxyProtocolV2HTTPServer(ProxyProtocolHTTPServer):
"""
Wrapper for tornado.httpserver.HTTPServer that parses out the
HAProxy PROXY protocol and sets the appropriate remote_ip as
defined here:
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
"""
async def handle_stream(self, stream: iostream.IOStream,
address: Tuple) -> None:
"""
Creates the stream for this connection.
This parses out the proxy protocol from the request and injects the
relevant fields onto the given stream object at: `stream.proxy_addr`
"""
try:
line = b''
addr = None
# Read and parse proxy protocol header
line = await stream.read_bytes(PROXY_PROTO_V2_HEADER_LENGTH)
header = parse_proxy_v2_header(line)
# Read and parse client ips
line = await stream.read_bytes(header["length"])
addr = parse_proxy_v2_addr(header, line)
if addr is not None:
stream.proxy_addr = addr
# handle_stream called from parrent of ProxyProtocolHTTPServer
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
except iostream.StreamClosedError:
pass
except ProxyProtocolInvalidLine as e:
print("Invalid proxy protocol header: %s" % str(e.line))
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
except ProxyProtocolSocketIsNotSupported:
print("Proxy protocol: Unix socket is not supported")
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
except Exception as e:
print("Exception %s caught" % str(e))
super(ProxyProtocolHTTPServer, self).handle_stream(stream, address)
This solution originates in @eulersIDcrisis code, but I altered it a little bit and it works fine with Tornado(6.2.0-3) and Python(3.11.2). Not all fetures are implemented(I simply didn't needed them)