vendor/symfony/amqp-messenger/Transport/Connection.php line 205

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
  11. use Symfony\Component\Messenger\Exception\InvalidArgumentException;
  12. use Symfony\Component\Messenger\Exception\LogicException;
  13. use Symfony\Component\Messenger\Exception\TransportException;
  14. /**
  15.  * An AMQP connection.
  16.  *
  17.  * @author Samuel Roze <samuel.roze@gmail.com>
  18.  *
  19.  * @final
  20.  */
  21. class Connection
  22. {
  23.     private const ARGUMENTS_AS_INTEGER = [
  24.         'x-delay',
  25.         'x-expires',
  26.         'x-max-length',
  27.         'x-max-length-bytes',
  28.         'x-max-priority',
  29.         'x-message-ttl',
  30.     ];
  31.     /**
  32.      * @see https://github.com/php-amqp/php-amqp/blob/master/amqp_connection_resource.h
  33.      */
  34.     private const AVAILABLE_OPTIONS = [
  35.         'host',
  36.         'port',
  37.         'vhost',
  38.         'user',
  39.         'login',
  40.         'password',
  41.         'queues',
  42.         'exchange',
  43.         'delay',
  44.         'auto_setup',
  45.         'prefetch_count',
  46.         'retry',
  47.         'persistent',
  48.         'frame_max',
  49.         'channel_max',
  50.         'heartbeat',
  51.         'read_timeout',
  52.         'write_timeout',
  53.         'confirm_timeout',
  54.         'connect_timeout',
  55.         'rpc_timeout',
  56.         'cacert',
  57.         'cert',
  58.         'key',
  59.         'verify',
  60.         'sasl_method',
  61.     ];
  62.     private const AVAILABLE_QUEUE_OPTIONS = [
  63.         'binding_keys',
  64.         'binding_arguments',
  65.         'flags',
  66.         'arguments',
  67.     ];
  68.     private const AVAILABLE_EXCHANGE_OPTIONS = [
  69.         'name',
  70.         'type',
  71.         'default_publish_routing_key',
  72.         'flags',
  73.         'arguments',
  74.     ];
  75.     private $connectionOptions;
  76.     private $exchangeOptions;
  77.     private $queuesOptions;
  78.     private $amqpFactory;
  79.     private $autoSetupExchange;
  80.     private $autoSetupDelayExchange;
  81.     /**
  82.      * @var \AMQPChannel|null
  83.      */
  84.     private $amqpChannel;
  85.     /**
  86.      * @var \AMQPExchange|null
  87.      */
  88.     private $amqpExchange;
  89.     /**
  90.      * @var \AMQPQueue[]|null
  91.      */
  92.     private $amqpQueues = [];
  93.     /**
  94.      * @var \AMQPExchange|null
  95.      */
  96.     private $amqpDelayExchange;
  97.     /**
  98.      * @var int
  99.      */
  100.     private $lastActivityTime 0;
  101.     public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, ?AmqpFactory $amqpFactory null)
  102.     {
  103.         if (!\extension_loaded('amqp')) {
  104.             throw new LogicException(sprintf('You cannot use the "%s" as the "amqp" extension is not installed.'__CLASS__));
  105.         }
  106.         $this->connectionOptions array_replace_recursive([
  107.             'delay' => [
  108.                 'exchange_name' => 'delays',
  109.                 'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
  110.             ],
  111.         ], $connectionOptions);
  112.         $this->autoSetupExchange $this->autoSetupDelayExchange $connectionOptions['auto_setup'] ?? true;
  113.         $this->exchangeOptions $exchangeOptions;
  114.         $this->queuesOptions $queuesOptions;
  115.         $this->amqpFactory $amqpFactory ?? new AmqpFactory();
  116.     }
  117.     /**
  118.      * Creates a connection based on the DSN and options.
  119.      *
  120.      * Available options:
  121.      *
  122.      *   * host: Hostname of the AMQP service
  123.      *   * port: Port of the AMQP service
  124.      *   * vhost: Virtual Host to use with the AMQP service
  125.      *   * user|login: Username to use to connect the AMQP service
  126.      *   * password: Password to use to connect to the AMQP service
  127.      *   * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
  128.      *   * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
  129.      *   * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
  130.      *   * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
  131.      *   * queues[name]: An array of queues, keyed by the name
  132.      *     * binding_keys: The binding keys (if any) to bind to this queue
  133.      *     * binding_arguments: Arguments to be used while binding the queue.
  134.      *     * flags: Queue flags (Default: AMQP_DURABLE)
  135.      *     * arguments: Extra arguments
  136.      *   * exchange:
  137.      *     * name: Name of the exchange
  138.      *     * type: Type of exchange (Default: fanout)
  139.      *     * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
  140.      *     * flags: Exchange flags (Default: AMQP_DURABLE)
  141.      *     * arguments: Extra arguments
  142.      *   * delay:
  143.      *     * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
  144.      *     * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
  145.      *   * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
  146.      *
  147.      *   * Connection tuning options (see http://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.tune for details):
  148.      *     * channel_max: Specifies highest channel number that the server permits. 0 means standard extension limit
  149.      *       (see PHP_AMQP_MAX_CHANNELS constant)
  150.      *     * frame_max: The largest frame size that the server proposes for the connection, including frame header
  151.      *       and end-byte. 0 means standard extension limit (depends on librabbimq default frame size limit)
  152.      *     * heartbeat: The delay, in seconds, of the connection heartbeat that the server wants.
  153.      *       0 means the server does not want a heartbeat. Note, librabbitmq has limited heartbeat support,
  154.      *       which means heartbeats checked only during blocking calls.
  155.      *
  156.      *   TLS support (see https://www.rabbitmq.com/ssl.html for details):
  157.      *     * cacert: Path to the CA cert file in PEM format.
  158.      *     * cert: Path to the client certificate in PEM format.
  159.      *     * key: Path to the client key in PEM format.
  160.      *     * verify: Enable or disable peer verification. If peer verification is enabled then the common name in the
  161.      *       server certificate must match the server name. Peer verification is enabled by default.
  162.      */
  163.     public static function fromDsn(string $dsn, array $options = [], ?AmqpFactory $amqpFactory null): self
  164.     {
  165.         if (false === $params parse_url($dsn)) {
  166.             // this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
  167.             if (!\in_array($dsn, ['amqp://''amqps://'])) {
  168.                 throw new InvalidArgumentException('The given AMQP DSN is invalid.');
  169.             }
  170.             $params = [];
  171.         }
  172.         $useAmqps === strpos($dsn'amqps://');
  173.         $pathParts = isset($params['path']) ? explode('/'trim($params['path'], '/')) : [];
  174.         $exchangeName $pathParts[1] ?? 'messages';
  175.         parse_str($params['query'] ?? ''$parsedQuery);
  176.         $port $useAmqps 5671 5672;
  177.         $amqpOptions array_replace_recursive([
  178.             'host' => $params['host'] ?? 'localhost',
  179.             'port' => $params['port'] ?? $port,
  180.             'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
  181.             'exchange' => [
  182.                 'name' => $exchangeName,
  183.             ],
  184.         ], $options$parsedQuery);
  185.         self::validateOptions($amqpOptions);
  186.         if (isset($params['user'])) {
  187.             $amqpOptions['login'] = rawurldecode($params['user']);
  188.         }
  189.         if (isset($params['pass'])) {
  190.             $amqpOptions['password'] = rawurldecode($params['pass']);
  191.         }
  192.         if (!isset($amqpOptions['queues'])) {
  193.             $amqpOptions['queues'][$exchangeName] = [];
  194.         }
  195.         $exchangeOptions $amqpOptions['exchange'];
  196.         $queuesOptions $amqpOptions['queues'];
  197.         unset($amqpOptions['queues'], $amqpOptions['exchange']);
  198.         if (isset($amqpOptions['auto_setup'])) {
  199.             $amqpOptions['auto_setup'] = filter_var($amqpOptions['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
  200.         }
  201.         $queuesOptions array_map(function ($queueOptions) {
  202.             if (!\is_array($queueOptions)) {
  203.                 $queueOptions = [];
  204.             }
  205.             if (\is_array($queueOptions['arguments'] ?? false)) {
  206.                 $queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
  207.             }
  208.             return $queueOptions;
  209.         }, $queuesOptions);
  210.         if (!$useAmqps) {
  211.             unset($amqpOptions['cacert'], $amqpOptions['cert'], $amqpOptions['key'], $amqpOptions['verify']);
  212.         }
  213.         if ($useAmqps && !self::hasCaCertConfigured($amqpOptions)) {
  214.             throw new InvalidArgumentException('No CA certificate has been provided. Set "amqp.cacert" in your php.ini or pass the "cacert" parameter in the DSN to use SSL. Alternatively, you can use amqp:// to use without SSL.');
  215.         }
  216.         return new self($amqpOptions$exchangeOptions$queuesOptions$amqpFactory);
  217.     }
  218.     private static function validateOptions(array $options): void
  219.     {
  220.         if (< \count($invalidOptions array_diff(array_keys($options), self::AVAILABLE_OPTIONS))) {
  221.             trigger_deprecation('symfony/messenger''5.1''Invalid option(s) "%s" passed to the AMQP Messenger transport. Passing invalid options is deprecated.'implode('", "'$invalidOptions));
  222.         }
  223.         if (isset($options['prefetch_count'])) {
  224.             trigger_deprecation('symfony/messenger''5.3''The "prefetch_count" option passed to the AMQP Messenger transport has no effect and should not be used.');
  225.         }
  226.         if (\is_array($options['queues'] ?? false)) {
  227.             foreach ($options['queues'] as $queue) {
  228.                 if (!\is_array($queue)) {
  229.                     continue;
  230.                 }
  231.                 if (< \count($invalidQueueOptions array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
  232.                     trigger_deprecation('symfony/messenger''5.1''Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.'implode('", "'$invalidQueueOptions));
  233.                 }
  234.             }
  235.         }
  236.         if (\is_array($options['exchange'] ?? false)
  237.             && < \count($invalidExchangeOptions array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) {
  238.             trigger_deprecation('symfony/messenger''5.1''Invalid exchange option(s) "%s" passed to the AMQP Messenger transport. Passing invalid exchange options is deprecated.'implode('", "'$invalidExchangeOptions));
  239.         }
  240.     }
  241.     private static function normalizeQueueArguments(array $arguments): array
  242.     {
  243.         foreach (self::ARGUMENTS_AS_INTEGER as $key) {
  244.             if (!\array_key_exists($key$arguments)) {
  245.                 continue;
  246.             }
  247.             if (!is_numeric($arguments[$key])) {
  248.                 throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.'$keyget_debug_type($arguments[$key])));
  249.             }
  250.             $arguments[$key] = (int) $arguments[$key];
  251.         }
  252.         return $arguments;
  253.     }
  254.     private static function hasCaCertConfigured(array $amqpOptions): bool
  255.     {
  256.         return (isset($amqpOptions['cacert']) && '' !== $amqpOptions['cacert']) || '' !== \ini_get('amqp.cacert');
  257.     }
  258.     /**
  259.      * @throws \AMQPException
  260.      */
  261.     public function publish(string $body, array $headers = [], int $delayInMs 0, ?AmqpStamp $amqpStamp null): void
  262.     {
  263.         $this->clearWhenDisconnected();
  264.         if ($this->autoSetupExchange) {
  265.             $this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
  266.         }
  267.         $this->withConnectionExceptionRetry(function () use ($body$headers$delayInMs$amqpStamp) {
  268.             if (!== $delayInMs) {
  269.                 $this->publishWithDelay($body$headers$delayInMs$amqpStamp);
  270.                 return;
  271.             }
  272.             $this->publishOnExchange(
  273.                 $this->exchange(),
  274.                 $body,
  275.                 $this->getRoutingKeyForMessage($amqpStamp),
  276.                 $headers,
  277.                 $amqpStamp
  278.             );
  279.         });
  280.     }
  281.     /**
  282.      * Returns an approximate count of the messages in defined queues.
  283.      */
  284.     public function countMessagesInQueues(): int
  285.     {
  286.         return array_sum(array_map(function ($queueName) {
  287.             return $this->queue($queueName)->declareQueue();
  288.         }, $this->getQueueNames()));
  289.     }
  290.     /**
  291.      * @throws \AMQPException
  292.      */
  293.     private function publishWithDelay(string $body, array $headersint $delay, ?AmqpStamp $amqpStamp null)
  294.     {
  295.         $routingKey $this->getRoutingKeyForMessage($amqpStamp);
  296.         $isRetryAttempt $amqpStamp $amqpStamp->isRetryAttempt() : false;
  297.         $this->setupDelay($delay$routingKey$isRetryAttempt);
  298.         $this->publishOnExchange(
  299.             $this->getDelayExchange(),
  300.             $body,
  301.             $this->getRoutingKeyForDelay($delay$routingKey$isRetryAttempt),
  302.             $headers,
  303.             $amqpStamp
  304.         );
  305.     }
  306.     private function publishOnExchange(\AMQPExchange $exchangestring $body, ?string $routingKey null, array $headers = [], ?AmqpStamp $amqpStamp null)
  307.     {
  308.         $attributes $amqpStamp $amqpStamp->getAttributes() : [];
  309.         $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
  310.         $attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
  311.         $attributes['timestamp'] = $attributes['timestamp'] ?? time();
  312.         $this->lastActivityTime time();
  313.         $exchange->publish(
  314.             $body,
  315.             $routingKey,
  316.             $amqpStamp $amqpStamp->getFlags() : \AMQP_NOPARAM,
  317.             $attributes
  318.         );
  319.         if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
  320.             $this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']);
  321.         }
  322.     }
  323.     private function setupDelay(int $delay, ?string $routingKeybool $isRetryAttempt)
  324.     {
  325.         if ($this->autoSetupDelayExchange) {
  326.             $this->setupDelayExchange();
  327.         }
  328.         $queue $this->createDelayQueue($delay$routingKey$isRetryAttempt);
  329.         $queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
  330.         $queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay$routingKey$isRetryAttempt));
  331.     }
  332.     private function getDelayExchange(): \AMQPExchange
  333.     {
  334.         if (null === $this->amqpDelayExchange) {
  335.             $this->amqpDelayExchange $this->amqpFactory->createExchange($this->channel());
  336.             $this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
  337.             $this->amqpDelayExchange->setType(\AMQP_EX_TYPE_DIRECT);
  338.             $this->amqpDelayExchange->setFlags(\AMQP_DURABLE);
  339.         }
  340.         return $this->amqpDelayExchange;
  341.     }
  342.     /**
  343.      * Creates a delay queue that will delay for a certain amount of time.
  344.      *
  345.      * This works by setting message TTL for the delay and pointing
  346.      * the dead letter exchange to the original exchange. The result
  347.      * is that after the TTL, the message is sent to the dead-letter-exchange,
  348.      * which is the original exchange, resulting on it being put back into
  349.      * the original queue.
  350.      */
  351.     private function createDelayQueue(int $delay, ?string $routingKeybool $isRetryAttempt): \AMQPQueue
  352.     {
  353.         $queue $this->amqpFactory->createQueue($this->channel());
  354.         $queue->setName($this->getRoutingKeyForDelay($delay$routingKey$isRetryAttempt));
  355.         $queue->setFlags(\AMQP_DURABLE);
  356.         $queue->setArguments([
  357.             'x-message-ttl' => $delay,
  358.             // delete the delay queue 10 seconds after the message expires
  359.             // publishing another message redeclares the queue which renews the lease
  360.             'x-expires' => $delay 10000,
  361.             // message should be broadcast to all consumers during delay, but to only one queue during retry
  362.             // empty name is default direct exchange
  363.             'x-dead-letter-exchange' => $isRetryAttempt '' $this->exchangeOptions['name'],
  364.             // after being released from to DLX, make sure the original routing key will be used
  365.             // we must use an empty string instead of null for the argument to be picked up
  366.             'x-dead-letter-routing-key' => $routingKey ?? '',
  367.         ]);
  368.         return $queue;
  369.     }
  370.     private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKeybool $isRetryAttempt): string
  371.     {
  372.         $action $isRetryAttempt '_retry' '_delay';
  373.         return str_replace(
  374.             ['%delay%''%exchange_name%''%routing_key%'],
  375.             [$delay$this->exchangeOptions['name'], $finalRoutingKey ?? ''],
  376.             $this->connectionOptions['delay']['queue_name_pattern']
  377.         ).$action;
  378.     }
  379.     /**
  380.      * Gets a message from the specified queue.
  381.      *
  382.      * @throws \AMQPException
  383.      */
  384.     public function get(string $queueName): ?\AMQPEnvelope
  385.     {
  386.         $this->clearWhenDisconnected();
  387.         if ($this->autoSetupExchange) {
  388.             $this->setupExchangeAndQueues();
  389.         }
  390.         if (false !== $message $this->queue($queueName)->get()) {
  391.             return $message;
  392.         }
  393.         return null;
  394.     }
  395.     public function ack(\AMQPEnvelope $messagestring $queueName): bool
  396.     {
  397.         return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;
  398.     }
  399.     public function nack(\AMQPEnvelope $messagestring $queueNameint $flags = \AMQP_NOPARAM): bool
  400.     {
  401.         return $this->queue($queueName)->nack($message->getDeliveryTag(), $flags) ?? true;
  402.     }
  403.     public function setup(): void
  404.     {
  405.         $this->setupExchangeAndQueues();
  406.         $this->setupDelayExchange();
  407.     }
  408.     private function setupExchangeAndQueues(): void
  409.     {
  410.         $this->exchange()->declareExchange();
  411.         foreach ($this->queuesOptions as $queueName => $queueConfig) {
  412.             $this->queue($queueName)->declareQueue();
  413.             foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
  414.                 $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey$queueConfig['binding_arguments'] ?? []);
  415.             }
  416.         }
  417.         $this->autoSetupExchange false;
  418.     }
  419.     private function setupDelayExchange(): void
  420.     {
  421.         $this->getDelayExchange()->declareExchange();
  422.         $this->autoSetupDelayExchange false;
  423.     }
  424.     /**
  425.      * @return string[]
  426.      */
  427.     public function getQueueNames(): array
  428.     {
  429.         return array_keys($this->queuesOptions);
  430.     }
  431.     public function channel(): \AMQPChannel
  432.     {
  433.         if (null === $this->amqpChannel) {
  434.             $connection $this->amqpFactory->createConnection($this->connectionOptions);
  435.             $connectMethod 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pconnect' 'connect';
  436.             try {
  437.                 $connection->{$connectMethod}();
  438.             } catch (\AMQPConnectionException $e) {
  439.                 throw new \AMQPException('Could not connect to the AMQP server. Please verify the provided DSN.'0$e);
  440.             }
  441.             $this->amqpChannel $this->amqpFactory->createChannel($connection);
  442.             if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
  443.                 $this->amqpChannel->confirmSelect();
  444.                 $this->amqpChannel->setConfirmCallback(
  445.                     static function (): bool {
  446.                         return false;
  447.                     },
  448.                     static function () {
  449.                         throw new TransportException('Message publication failed due to a negative acknowledgment (nack) from the broker.');
  450.                     }
  451.                 );
  452.             }
  453.             $this->lastActivityTime time();
  454.         } elseif (< ($this->connectionOptions['heartbeat'] ?? 0) && time() > $this->lastActivityTime $this->connectionOptions['heartbeat']) {
  455.             $disconnectMethod 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pdisconnect' 'disconnect';
  456.             $this->amqpChannel->getConnection()->{$disconnectMethod}();
  457.         }
  458.         return $this->amqpChannel;
  459.     }
  460.     public function queue(string $queueName): \AMQPQueue
  461.     {
  462.         if (!isset($this->amqpQueues[$queueName])) {
  463.             $queueConfig $this->queuesOptions[$queueName] ?? [];
  464.             $amqpQueue $this->amqpFactory->createQueue($this->channel());
  465.             $amqpQueue->setName($queueName);
  466.             $amqpQueue->setFlags($queueConfig['flags'] ?? \AMQP_DURABLE);
  467.             if (isset($queueConfig['arguments'])) {
  468.                 $amqpQueue->setArguments($queueConfig['arguments']);
  469.             }
  470.             $this->amqpQueues[$queueName] = $amqpQueue;
  471.         }
  472.         return $this->amqpQueues[$queueName];
  473.     }
  474.     public function exchange(): \AMQPExchange
  475.     {
  476.         if (null === $this->amqpExchange) {
  477.             $this->amqpExchange $this->amqpFactory->createExchange($this->channel());
  478.             $this->amqpExchange->setName($this->exchangeOptions['name']);
  479.             $this->amqpExchange->setType($this->exchangeOptions['type'] ?? \AMQP_EX_TYPE_FANOUT);
  480.             $this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? \AMQP_DURABLE);
  481.             if (isset($this->exchangeOptions['arguments'])) {
  482.                 $this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
  483.             }
  484.         }
  485.         return $this->amqpExchange;
  486.     }
  487.     private function clearWhenDisconnected(): void
  488.     {
  489.         if (!$this->channel()->isConnected()) {
  490.             $this->clear();
  491.         }
  492.     }
  493.     private function clear(): void
  494.     {
  495.         $this->amqpChannel null;
  496.         $this->amqpQueues = [];
  497.         $this->amqpExchange null;
  498.         $this->amqpDelayExchange null;
  499.     }
  500.     private function getDefaultPublishRoutingKey(): ?string
  501.     {
  502.         return $this->exchangeOptions['default_publish_routing_key'] ?? null;
  503.     }
  504.     public function purgeQueues()
  505.     {
  506.         foreach ($this->getQueueNames() as $queueName) {
  507.             $this->queue($queueName)->purge();
  508.         }
  509.     }
  510.     private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
  511.     {
  512.         return (null !== $amqpStamp $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
  513.     }
  514.     private function withConnectionExceptionRetry(callable $callable): void
  515.     {
  516.         $maxRetries 3;
  517.         $retries 0;
  518.         retry:
  519.         try {
  520.             $callable();
  521.         } catch (\AMQPConnectionException $e) {
  522.             if (++$retries <= $maxRetries) {
  523.                 $this->clear();
  524.                 goto retry;
  525.             }
  526.             throw $e;
  527.         }
  528.     }
  529. }
  530. if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\Connection::class, false)) {
  531.     class_alias(Connection::class, \Symfony\Component\Messenger\Transport\AmqpExt\Connection::class);
  532. }