Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ private function readResponse(
while (null !== $chunk = $this->readChunk($timeout)) {
parseChunk:
$response = $parser->parse($chunk);

if ($response === null) {
if ($this->socket === null) {
throw new SocketException('Socket closed prior to response completion');
Expand Down Expand Up @@ -377,17 +378,20 @@ private function readResponse(
$bodyDeferredCancellation
));

[$reqTimeout, $explicitTimeout, $priorTimeout] = $this->determineKeepAliveTimeout($response);

// Read body async
EventLoop::queue(function () use (
$parser,
$request,
$response,
$reqTimeout,
$explicitTimeout,
$priorTimeout,
$bodyEmitter,
$trailersDeferred,
$originalCancellation,
$readingCancellation,
$bodyCancellation,
$stream,
$timeout,
&$trailers
) {
Expand Down Expand Up @@ -443,7 +447,14 @@ private function readResponse(
}
}

$timeout = $this->determineKeepAliveTimeout($response);
if ($explicitTimeout) {
$this->explicitTimeout = $explicitTimeout;
}
if ($priorTimeout !== null) {
$this->priorTimeout = $priorTimeout;
}

$timeout = $reqTimeout;

if ($timeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) {
$this->timeoutWatcher = EventLoop::delay($timeout, $this->close(...));
Expand Down Expand Up @@ -546,33 +557,30 @@ private function getRemainingTime(): float
return \max(0, $timestamp - now());
}

private function determineKeepAliveTimeout(Response $response): int
/** @return list{int, bool, ?int} */
private function determineKeepAliveTimeout(Response $response): array
{
$request = $response->getRequest();

$requestConnHeader = $request->getHeader('connection') ?? '';
$responseConnHeader = $response->getHeader('connection') ?? '';

if (!\strcasecmp($requestConnHeader, 'close')) {
return 0;
return [0, false, null];
}

if ($response->getProtocolVersion() === '1.0') {
return 0;
return [0, false, null];
}

if (!\strcasecmp($responseConnHeader, 'close')) {
return 0;
return [0, false, null];
}

$params = Http\parseMultipleHeaderFields($response, 'keep-alive')[0] ?? null;

$timeout = (int) ($params['timeout'] ?? $this->priorTimeout);
if (isset($params['timeout'])) {
$this->explicitTimeout = true;
}

return $this->priorTimeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT);
return [0, isset($params['timeout']), \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT)];
}

/**
Expand Down
36 changes: 24 additions & 12 deletions src/Connection/Internal/Http1Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
use Amp\Http\HttpMessage;
use Amp\Http\HttpStatus;
use Amp\Http\InvalidHeaderException;
use WeakReference;

use function Amp\Http\Client\events;
use function Amp\Http\mapHeaderPairs;

Expand All @@ -39,7 +41,8 @@ final class Http1Parser
public const TRAILERS_START = 4;
public const TRAILERS = 5;

private ?Response $response = null;
/** @var ?WeakReference<Response> */
private ?WeakReference $response = null;

private int $state = self::AWAITING_HEADERS;

Expand Down Expand Up @@ -110,9 +113,11 @@ public function parse(?string $data = null): ?Response

if (!$this->bodyStarted && \in_array($this->state, [self::BODY_CHUNKS, self::BODY_IDENTITY, self::BODY_IDENTITY_EOF], true)) {
$this->bodyStarted = true;
$response = $this->response;
\assert($response !== null);
events()->responseBodyStart($this->request, $this->stream, $response);
$response = $this->response?->get();
if ($response !== null) {
events()->responseBodyStart($this->request, $this->stream, $response);
unset($response);
}
}

switch ($this->state) {
Expand Down Expand Up @@ -185,13 +190,15 @@ public function parse(?string $data = null): ?Response

events()->responseHeaderEnd($this->request, $this->stream, $response);

return $this->response = $response;
$this->response = WeakReference::create($response);
return $response;
}

body_identity:
{
if ($data !== null && $data !== '') {
events()->responseBodyProgress($this->request, $this->stream, $this->response);
if ($data !== null && $data !== '' && ($r = $this->response->get())) {
events()->responseBodyProgress($this->request, $this->stream, $r);
unset($r);
}

$bufferDataSize = \strlen($this->buffer);
Expand Down Expand Up @@ -219,8 +226,9 @@ public function parse(?string $data = null): ?Response

body_identity_eof:
{
if ($data !== null && $data !== '') {
events()->responseBodyProgress($this->request, $this->stream, $this->response);
if ($data !== null && $data !== '' && ($r = $this->response->get())) {
events()->responseBodyProgress($this->request, $this->stream, $r);
unset($r);
}

$this->addToBody($this->buffer);
Expand All @@ -230,8 +238,9 @@ public function parse(?string $data = null): ?Response

body_chunks:
{
if ($data !== null && $data !== '') {
events()->responseBodyProgress($this->request, $this->stream, $this->response);
if ($data !== null && $data !== '' && ($r = $this->response->get())) {
events()->responseBodyProgress($this->request, $this->stream, $r);
unset($r);
}

if ($this->parseChunkedBody()) {
Expand Down Expand Up @@ -272,7 +281,10 @@ public function parse(?string $data = null): ?Response

complete:
{
events()->responseBodyEnd($this->request, $this->stream, $this->response);
if ($r = $this->response->get()) {
events()->responseBodyEnd($this->request, $this->stream, $r);
unset($r);
}

$this->complete = true;

Expand Down
36 changes: 36 additions & 0 deletions test/Connection/ConnectionLimitingPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,42 @@ public function testSingleConnection(): void
]);
}

public function testSingleConnectionDoNotUseBody(): void
{
$client = (new HttpClientBuilder)
->usingPool(ConnectionLimitingPool::byAuthority(1))
->build();

$this->setMinimumRuntime(2);

$r1 = new Request('https://httpbin.org/delay/1');
$r1->setProtocolVersions(['1.1']);
$r2 = new Request('https://httpbin.org/delay/1');
$r2->setProtocolVersions(['1.1']);
Future\await([
async($client->request(...), $r1),
async($client->request(...), $r2),
]);
}

public function testSingleConnectionDoNotUseBodyHttp2(): void
{
$client = (new HttpClientBuilder)
->usingPool(ConnectionLimitingPool::byAuthority(1))
->build();

$this->setMinimumRuntime(1);

$r1 = new Request('https://httpbin.org/delay/1');
$r1->setProtocolVersions(['2']);
$r2 = new Request('https://httpbin.org/delay/1');
$r2->setProtocolVersions(['2']);
Future\await([
async($client->request(...), $r1),
async($client->request(...), $r2),
]);
}

public function testTwoConnections(): void
{
$client = (new HttpClientBuilder)
Expand Down
Loading