温斯顿吴 赌场式交易员

阿里云消息服务(MNS)应用开发总结

2017-06-13

背景

MNS和ONS的对比

阿里云提供2种队列服务,在命名上有些混乱,这里先理清一下关系。

两个产品的对比:消息服务MNS和消息队列ONS产品对比

主要区别:

  • MNS仅支持HTTP协议,ONS支持HTTP、TCP、MQTT等多种协议;
  • MNS不支持定时消息,ONS支持;
  • MNS支持批量消息操作,ONS不支持;
  • MNS支持消息优先级,ONS不支持;
  • MNS可靠性99.99999999%,ONS 99.99%

个人认为:ONS的队列引擎是阿里基于MQTT协议的一种实现,即RocketMQ,主要是作为一种开源产品提供出来,如果自己搭建队列服务,可以考虑使用。当然了,如果是自己搭建服务,倒不如直接用RabbitMQ了。 而MNS更像是专门为了对外提供商业化的MQ服务而开发的商业化产品(从它只支持HTTP协议可以看出来),所以如果想直接拿来主义,避免自己维护队列服务器的烦恼,显然MNS是更加合适的选择。

阿里云MNS总共包含两种功能模型:队列模型(pull)和主题订阅模型(push)。

MNS官方文档

官方文档

实践准备

设置专门用于队列操作的AK

Access Key管理 -> 创建Access Key 这里没有使用RAM子账号的方式,而是直接使用当前账户创建了一组新的Access Key ID和Access Key Secret。

获取MNS访问域名

消息服务管理控制台 -> 队列(主题) -> 获取Endpoint,得到消费服务的公网Endpoint: http(s)://1460488513831433.mns.cn-shanghai.aliyuncs.com/

下载SDK

下载MNS PHP SDK,略。

Demo代码文件结构

alymns
  -- php+sdk
  -- Queue
    -- getMessage.php
    -- sendMessage.php
  -- Topic
    -- Publish.php
    -- Subscribe.php

此外还有处理Topic回调的代码。

队列操作

创建队列

消息服务管理控制台 -> 队列 -> 创建队列:

  • 队列名称:woojean-queue 队列的QueueName属性,同一账号同一Region下, 队列名称不能重名;必须以英文字母或者数字开头,剩余名称可以是英文,数字,横划线,长度不超过256个字符。
  • 当前地域:华东2
  • 消息接收长轮询等待时间:30s 队列的PollingWaitSeconds属性, 设为0时关闭长轮询;当不为0时,长轮询模式开启,此时一个消息消费请求只会在取到有效消息或长轮询超时时才返回响应;单位秒,有效值范围为0-30秒。
  • 取出消息隐藏时长:300s 队列的VisibilityTimeout属性,消息从本队列中取出后会被从Active可取状态变成Inactive隐藏状态后,本属性指定了消息隐藏持续时间,这个时间一到,消息会从隐藏恢复成Active可取状态;单位为秒,有效值范围1-43200秒,也即1秒到12小时。
  • 消息最大长度:2048 队列的MaxMessageSize属性, 限定允许发送到该队列的消息体的最大长度;单位为byte, 有效值范围为1024-65536 也即1K到64K。
  • 消息存活时间:1296000 队列的MessageRetentionPeriod属性, 消息在本队列中最长的存活时间,从发送到该队列开始经过此参数指定的时间后,不论消息是否被取出过都将被删除;单位为秒,有效值范围60-1296000秒,也即1分钟到15天。
  • 消息延时:0 队列的DelaySeconds属性,发送消息到本队列的所有消息默认将以本参数指定的秒数延后可被消费;单位为秒,有效值范围为0-604800秒,也即0到 7天。

也可以通过SDK代码创建、删除队列,但是不如通过控制台创建直观。

发送消息

sendMessage.php

<?php

// 引用MNS SDK
$root = dirname(dirname(__FILE__));
require_once($root . '/php_sdk/mns-autoloader.php');

use AliyunMNS\Client;
use AliyunMNS\Exception\MnsException;
use AliyunMNS\Requests\SendMessageRequest;


// 新建队列操作客户端
$endPoint = 'http://1460488513831433.mns.cn-shanghai.aliyuncs.com/';
$accessId = '****';
$accessKey = '****';
$queueName = 'woojean-queue';
$client = new Client($endPoint, $accessId, $accessKey);

// 获取队列的引用
$queue = $client->getQueueRef($queueName);

// 创建待发送的消息
$messageBody = "Hello World!";

// 发送消息
$request = new SendMessageRequest($messageBody);
try {
    $res = $queue->sendMessage($request);
    echo "MessageSent! \n";
} catch (MnsException $e) {
    echo "SendMessage Failed: " . $e;
    return;
}

接收消息

getMessage.php

<?php

// 引用MNS SDK
$root = dirname(dirname(__FILE__));
require_once($root . '/php_sdk/mns-autoloader.php');

use AliyunMNS\Client;
use AliyunMNS\Exception\MnsException;


// 新建队列操作客户端
$endPoint = 'http://1460488513831433.mns.cn-shanghai.aliyuncs.com/';
$accessId = '****';
$accessKey = '****';
$queueName = 'woojean-queue';
$client = new Client($endPoint, $accessId, $accessKey);

// 获取队列的引用
$queue = $client->getQueueRef($queueName);

// 接收消息
$receiptHandle = NULL;
while (true) {
    try {
        // when receiving messages, it's always a good practice to set the waitSeconds to be 30.
        // it means to send one http-long-polling request which lasts 30 seconds at most.
        $res = $queue->receiveMessage(30);

        echo "ReceiveMessage Succeed! \n";
        $messageBody = $res->getMessageBody();
        var_dump($messageBody);

        // 删除消息
        $receiptHandle = $res->getReceiptHandle();
        try {
            $res = $queue->deleteMessage($receiptHandle);
            echo "DeleteMessage Succeed! \n";
        } catch (MnsException $e) {
            echo "DeleteMessage Failed: " . $e;
            return;
        }
    } catch (MnsException $e) {
        /*
         * ReceiveMessage Failed:
         * Code: 404 Message: Message not exist.
         * MnsErrorCode: MessageNotExist
         * RequestId: 593F70CF19E20A932F4CC096
         * HostId: http://1460488513831433.mns.cn-shanghai.aliyuncs.com
         * */
        echo "ReceiveMessage Failed: " . $e;
        return;
    }
}

主题操作

创建主题

消息服务管理控制台 -> 队列 -> 创建队列:

  • 主题名称:woojean-topic 主题的TopicName属性,同一账号同一Region下,主题名称不能重复,必须以英文字母开头,剩余名称可以是英文,数字,横划线,长度不超过256个字符
  • 当前地域:华东 2
  • 消息最大长度(Byte) :2048 主题的MaximumMessageSize属性,允许发送到该主题的消息体的最大长度,单位为byte,有效值范围为:1024(1KB)-65536(64KB),默认值为65536。

点击对应主题的“获取地址”按钮,获取主题的地址: http://1460488513831433.mns.cn-shanghai.aliyuncs.com/topics/woojean-topic

创建回调

回调地址需要外网可以访问,临时在已有的Phalcon项目中添加了一个Post入口

<?php
namespace Controllers;

/**
 * @RoutePrefix("/test")
 */
class TestController extends BaseController
{
    /**
     * @description
     * @Post("/topic")
     */
    public function topicAction()
    {
        $request = \Phalcon\DI::getDefault()->get('request');
        $rawBody = $request->getRawBody();
        $this->log->info($rawBody);

        http_response_code(200);  // 需要返回200告知成功
    }

创建订阅

Subscribe.php

<?php

// 引用MNS SDK
$root = dirname(dirname(__FILE__));
require_once($root . '/php_sdk/mns-autoloader.php');

use AliyunMNS\Client;
use AliyunMNS\Exception\MnsException;
use AliyunMNS\Model\SubscriptionAttributes;

// 新建队列操作客户端
$endPoint = 'http://1460488513831433.mns.cn-shanghai.aliyuncs.com/';
$accessId = '****';
$accessKey = '****';
$topicName = 'woojean-topic';
$client = new Client($endPoint, $accessId, $accessKey);

try
{
    $subscriptionName = "woojean-subscription";
    $endPoint = 'http://sms.dongshier.com/test/topic';
    $attributes = new SubscriptionAttributes($subscriptionName, $endPoint,'BACKOFF_RETRY','SIMPLIFIED');
    $topic = $client->getTopicRef($topicName);
    $topic->subscribe($attributes);
    echo "Subscribed! \n";
}
catch (MnsException $e)
{
    echo "SubscribeFailed: " . $e;
    return;
}

发布消息

Publish.php

<?php

// 引用MNS SDK
$root = dirname(dirname(__FILE__));
require_once($root . '/php_sdk/mns-autoloader.php');

use AliyunMNS\Client;
use AliyunMNS\Exception\MnsException;
use AliyunMNS\Requests\PublishMessageRequest;

// 新建队列操作客户端
$endPoint = 'http://1460488513831433.mns.cn-shanghai.aliyuncs.com/';
$accessId = '****';
$accessKey = '****';
$topicName = 'woojean-topic';
$client = new Client($endPoint, $accessId, $accessKey);

try
{
    $messageBody = "Hello World!";
    $request = new PublishMessageRequest($messageBody);
    $topic = $client->getTopicRef($topicName);
    $res = $topic->publishMessage($request);
    echo "MessagePublished! \n";

    /*
    $topic->unsubscribe($subscriptionName);
    echo "Unsubscribe Succeed! \n";
    */
}
catch (MnsException $e)
{
    echo "PublishMessage Failed: " . $e;
    return;
}

NotifyContentFormat配置的说明文档

查看日志,收到如下请求: XML格式:

info | 13130 | 1497335254.557 | 2017:06:13 06:27:34 | /test/topic
info | 13130 | 1497335254.557 | 2017:06:13 06:27:34 | {"_url":"\/test\/topic"}
info | 13130 | 1497335254.557 | 2017:06:13 06:27:34 | <?xml version="1.0" encoding="UTF-8"?>
<Notification xmlns="http://mns.aliyuncs.com/doc/v1/">
  <TopicOwner>1460488513831433</TopicOwner>
  <TopicName>woojean-topic</TopicName>
  <Subscriber>1460488513831433</Subscriber>
  <SubscriptionName>woojean-subscription</SubscriptionName>
  <MessageId>CB1A8E7EFFA19158-1-15CA022CE05-200000008</MessageId>
  <MessageMD5>ED076287532E86365E841E92BFC50D8C</MessageMD5>
  <Message>Hello World!</Message>
  <PublishTime>1497335254533</PublishTime>
  <SigningCertURL>https://mnstest.oss-cn-hangzhou.aliyuncs.com/x509_public_certificate.pem</SigningCertURL>
</Notification>
*/

SIMPLIFIED格式:

info | 23318 | 1497335910.649 | 2017:06:13 06:38:30 | /test/topic
info | 23318 | 1497335910.649 | 2017:06:13 06:38:30 | {"_url":"\/test\/topic"}
info | 23318 | 1497335910.649 | 2017:06:13 06:38:30 | Hello World!

上一篇 三阶魔方教程

抖音
文章目录