php操作ActiveMQ

本文中的相关环境:

操作系统版本:CentOS Linux release 7.8.2003 (Core)

php版本:PHP 7.1.33 ( NTS )

Stomp扩展版本: 2.0.1


php在操作activemq的时候需要借助一个叫Stomp的扩展,所以要先安装一下该扩展。PS:准确来说,php的Stomp扩展是基于Stomp协议来开发的,而activemq支持多种协议(其中就包括 Stomp协议),所以 php也是通过Stomp协议(php的Stomp扩展)来和activemq进行交互的


安装php的Stomp扩展(pecl一键安装方式)

pecl install https://pecl.php.net/get/stomp-2.0.1.tgz -y

# 安装过程中出现 
OpenSSL install prefix (no to disable SSL support) [/usr] :  # 这里输入yes 按回车继续安装,不输入yes直接按回车 在接下来的编译中会报错 导致安装失败

# 安装完毕后可以在 cli模式使用 php -m | grep Stomp 来查看扩展是否已安装 或 在web模式下使用 phpinfo() 函数来查看是否已安装该扩展


Stomp扩展的各大版本地址:https://pecl.php.net/package/stomp


创建队列,并往队列中写入数据:

/***************** 队列类型是:queue *************************/

$broker = 'tcp://192.168.13.119:61613'; //61613是默认的端口,在activemq目录下的conf/activemq.xml文件可查看到
$queue  = '/queue/userreg'; //创建一个queue类型的队列,队列名字是userreg 注意 这里是queue!这里是queue!这里是queue!

$user_data = array('user_name' => 'zhangsan');

$stomp = new Stomp($broker);

$stomp->send($queue, json_encode($user_data));






/***************** 队列类型是:topic *************************/

$broker = 'tcp://192.168.13.119:61613';
$topic_order = '/topic/order'; //创建一个topic类型的队列,队列名字是order 注意 这里是topic!这里是topic!这里是topic!

$order_data = array('order_sn' => '20201125155856');

$stomp = new Stomp($broker);

$stomp->send($topic_order, json_encode($order_data));



//queue和topic队列类型的区别 是在于读取队列的时候,queue用于点对点模型(一个生产者,一个消费者 是1对1的关系), topic用于发布/订阅模型(一个生产者,多个消费者 是1对多的关系)


普通队列读取(点对点(Point To Point)模型),使用示例:

$broker = 'tcp://192.168.13.119:61613'; 
$queue  = '/queue/userreg';  //注意 这里是queue!这里是queue!这里是queue!

try {
    $stomp = new Stomp($broker);
    
    $stomp->subscribe($queue); ////订阅了一个queue类型的队列 队列名字是userreg
    $frame = $stomp->readFrame();  //读取队列

    echo $frame->body.PHP_EOL;  //输出队列中的具体消息内容
    
    $stomp->ack($frame);  //给activemq服务发送ack通知,activemq服务收到ack通知后会把该消息从队列中移除

} catch(StompException $e) {
    echo $e->getMessage();
}


//一个队列中可能会有很多个数据,一般都是循环读取,示例代码如下:

$broker = 'tcp://192.168.13.119:61613';
$queue  = '/queue/userreg'; //注意 这里是queue!这里是queue!这里是queue!

try{
    $stomp = new Stomp($broker);
    $stomp->subscribe($queue);


    //hasFrame()方法:判断队列中是否有数据, false:队列中没有数据  true:队列中有数据
    while($stomp->hasFrame() !== false)
    {
        $frame = $stomp->readFrame();

        echo $frame->body.PHP_EOL;

        $stomp->ack($frame);

        usleep(5000000); //休眠5000毫秒
    }

}catch(StompException $e) {
    echo $e->getMessage();
}

点对点消息模型特点:只有一个消费者可以接收到消息,并且不能进行重复消费。


发布/订阅(Publish/Subscribe)模型,使用示例:

$broker = 'tcp://192.168.13.119:61613';
$topic= '/topic/order'; //注意 这里是topic!这里是topic!这里是topic!

try{

    $stomp = new Stomp($broker);
    $stomp->subscribe($topic);   //订阅了一个topic类型的队列 队列名字是order

    //hasFrame()方法:判断队列中是否有数据, false:队列中没有数据  true:队列中有数据
    while($stomp->hasFrame() !== false)
    {
        $frame = $stomp->readFrame();

        echo $frame->body.PHP_EOL;

        $stomp->ack($frame);

        usleep(5000000); //休眠5000毫秒
    }

}catch(StompException $e) {
    echo $e->getMessage();
}


//假如说下面还有1、2...N份代码都订阅了 /topic/order 队列,那么只要往/topic/order 发送消息后,1、2...N份代码都会同时读取到 /topic/order 队列中的内容,前提是 1、2...N份代码都在实时监听 /topic/order 队列才能收到

发布/订阅模型特点:消息是1对多的、多个消费者可以收到消息、可以重复消费消息


activemq中的事物处理

$broker = 'tcp://192.168.13.119:61613';

$queue1 = '/queue/sendsms';
$queue2 = '/queue/search';

$stomp = new Stomp($broker);
$stomp->subscribe($queue);

$stomp->begin("news");

if($stomp->send($queue1, 'sendsms队列中的内容', array('transaction' => 'news')) &&
    $stomp->send($queue2, 'search队列中的内容', array('transaction' => 'news'))
)
{
    $stomp->commit("news");
}

//begin方法中的值"news" 可以随便填写,不过下面的transaction的值和commit方法中的值必须和begin中的值是一样的



相关文章:linux下安装ActiveMQ服务(消息队列)



声明:禁止任何非法用途使用,凡因违规使用而引起的任何法律纠纷,本站概不负责。

小周博客
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

精彩评论

全部回复 0人评论 7,777人参与

loading