SOCKS5: support looking up names remotely#2666
Conversation
5e712e4 to
8771bee
Compare
8771bee to
d83a269
Compare
kafka/conn.py
Outdated
| else: | ||
| self._sock = self._socks5_proxy.socket(None, socket.SOCK_STREAM) | ||
| self._sock_afi = None | ||
| self._sock_addr = [self.host.encode('ascii'), self.port] |
There was a problem hiding this comment.
recommend you defer str.encode() until needed by socks code.
There was a problem hiding this comment.
I moved this as you suggested. Let me know what you think!
kafka/conn.py
Outdated
| return self.state | ||
| else: | ||
| if self.config["socks5_proxy"] is None or not self._socks5_proxy.use_remote_lookup(): | ||
| next_lookup = self._next_afi_sockaddr() |
There was a problem hiding this comment.
Would it be simpler to move the self._socks5_proxy.use_remote_lookup() inside of _next_afi_sockaddr() ? For socks proxy w/ remote_lookup, it should return (socket.AF_UNSPEC, (self.host, self.port)). With that change I think most of these other changes would be unnecessary.
There was a problem hiding this comment.
Thanks, that indeed reduces the changes quite a bit! Let me know what you think of the current version!
kafka/conn.py
Outdated
| """ | ||
| if self.disconnected() or self.connecting(): | ||
| if len(self._gai) > 0: | ||
| if len(self._gai) > 0 or self._socks5_proxy.use_remote_lookup(): |
There was a problem hiding this comment.
won't this crash if/when self._socks5_proxy is None and self._gai is empty?
There was a problem hiding this comment.
You're completely right. I added some extra logic now to check whether self._socks5_proxy is set.
kafka/conn.py
Outdated
|
|
||
| self._sock.setblocking(False) | ||
| self.config['state_change_callback'](self.node_id, self._sock, self) | ||
| if self._sock_afi != None: |
There was a problem hiding this comment.
generally should check is or is not for None, not equality.
There was a problem hiding this comment.
I got rid of the None variant for the socket family now.
kafka/conn.py
Outdated
| return self._api_version | ||
|
|
||
| def __str__(self): | ||
| if self._sock_afi != None: |
There was a problem hiding this comment.
implied suggestion above is to use socket.AF_UNSPEC instead of None
There was a problem hiding this comment.
Yep, makes sense :)
|
Many thanks for your review! I'll rework and push a better commit! |
d83a269 to
f3d9fb2
Compare
| self._sock_afi, self._sock_addr = next_lookup | ||
| try: | ||
| if self.config["socks5_proxy"] is not None: | ||
| self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) |
There was a problem hiding this comment.
I was wondering whether I could avoid this change. But next_lookup = self._next_afi_sockaddr() on line 377 will fail then because it uses the initialized self._socks5_proxy to check if it should look up hostnames remotely.
Let me know if you prefer this to be handled differently than it's now.
…ks5_proxy in close()
…xy init; consolidate buffer_out; fix port H encoding
|
I added a couple commits to fix a few things. Let me know if it looks ok. I did some basic local testing to validate. |
kafka/conn.py
Outdated
| if self.disconnected() or self.connecting(): | ||
| if len(self._gai) > 0: | ||
| return 0 | ||
| elif Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): |
There was a problem hiding this comment.
Socks5Wrapper.use_remote_lookup(None) returns False, so this works in the default case.
| "!{}s".format(addr_len), | ||
| socket.inet_pton(self._target_afi, addr[0]), | ||
| ) | ||
| self._buffer_out += struct.pack("!H", addr[1]) # port |
There was a problem hiding this comment.
I changed the pack format from h to H. Unsigned int is required for ports >32k
|
The new 2.3.0 release is working great! Many thanks for incorporating this!! |
Hi all, I'm dealing with a Kafka cluster behind a SOCKS proxy where the broker names cannot locally be resolved.
This PR implements resolving the name remotely. It is probably rather hacky though, so I appreciate any feedback to improve it and will make changes accordingly.