目录一:拓展安装[code]composer require enqueue/amqp-lib[/code]文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md 二:方法先容1:连接rabbitmq[code]$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88',//host 'port' => '5672',//端口 'vhost' => '/',//虚拟主机 'user' => 'admin',//账号 'pass' => 'admin',//暗码 ]); $context = $factory->createContext(); [/code]2:声明主题[code]//声明并创建主题 $exchangeName = 'exchange'; $fooTopic = $context->createTopic($exchangeName); $fooTopic->setType(AmqpTopic::TYPE_FANOUT); $context->declareTopic($fooTopic); //删除主题 $context->deleteTopic($fooTopic); [/code]3:声明队列[code]//声明并创建队列 $queueName = 'rabbitmq'; $fooQueue = $context->createQueue($queueName); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); $context->declareQueue($fooQueue); //删除队列 $context->deleteQueue($fooQueue); [/code]4:将队列绑定到主题[code]$context->bind(new AmqpBind($fooTopic, $fooQueue)); [/code]5:发送消息[code]//向队列发送消息 $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message); //向队列发送优先消息 $queueName = 'rabbitmq'; $fooQueue = $context->createQueue(queueName); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); //设置队列的最大优先级 $fooQueue->setArguments(['x-max-priority' => 10]); $context->declareQueue($fooQueue); $message = $context->createMessage('Hello world!'); $context->createProducer() ->setPriority(5) //设置优先级,优先级越高,消息越快到达斲丧者 ->send($fooQueue, $message); //向队列发送延时消息 $message = $context->createMessage('Hello world!'); $context->createProducer() ->setDelayStrategy(new RabbitMqDlxDelayStrategy()) ->setDeliveryDelay(5000) //消息延时5秒 ->send($fooQueue, $message); [/code]6:斲丧消息【接收消息】[code]//斲丧消息 $consumer = $context->createConsumer($fooQueue); $message = $consumer->receive(); // process a message //业务代码 $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应使命 // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应使命 //订阅斲丧者 $fooConsumer = $context->createConsumer($fooQueue); $subscriptionConsumer = $context->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message //业务代码 $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应使命 // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应使命 return true; }); $subscriptionConsumer->consume(); //清除队列消息 $queueName = 'rabbitmq'; $queue = $context->createQueue($queueName); $context->purgeQueue($queue); [/code]三:简单实现1:发送消息[code]//连接rabbitmq $factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88', 'port' => '5672', 'vhost' => '/', 'user' => 'admin', 'pass' => 'admin', 'persisted' => false, ]); $context = $factory->createContext(); //声明主题 $exchangeName = 'exchange'; $fooTopic = $context->createTopic($exchangeName); $fooTopic->setType(AmqpTopic::TYPE_FANOUT); $context->declareTopic($fooTopic); //声明队列 $queueName = 'rabbitmq'; $fooQueue = $context->createQueue($queueName); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); $context->declareQueue($fooQueue); //将队列绑定到主题 $context->bind(new AmqpBind($fooTopic, $fooQueue)); //发送消息到队列 $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message); [/code]2:斲丧消息[code]$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88', 'port' => '5672', 'vhost' => '/', 'user' => 'admin', 'pass' => 'admin', 'persisted' => false, ]); $context = $factory->createContext(); $queueName = 'rabbitmq'; $fooQueue = $context->createQueue($queueName); $fooConsumer = $context->createConsumer($fooQueue); $subscriptionConsumer = $context->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message //业务代码 $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应使命 // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应使命 return true; }); $subscriptionConsumer->consume();[/code]到此这篇关于PHP使用enqueue/amqp-lib实现rabbitmq使命处理的文章就先容到这了,更多相干PHP rabbitmq使命处理内容请搜刮脚本之家从前的文章或继续欣赏下面的相干文章盼望大家以后多多支持脚本之家! 来源:https://www.jb51.net/program/317443dm3.htm 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
|手机版|小黑屋|梦想之都-俊月星空
( 粤ICP备18056059号 )|网站地图
GMT+8, 2025-7-1 22:36 , Processed in 0.030896 second(s), 18 queries .
Powered by Mxzdjyxk! X3.5
© 2001-2025 Discuz! Team.