使用aschannel(v2)流式传输数据

更新

aschannel是一个流端点,允许客户端在生成时实时检索每个提交任务的结果。流式API与常规API的不同之处在于,连接无限期保持打开,并且消息在可用时立即发送到客户端。

客户使用asmaster订阅一个或多个帐户,或直接通过asapi请求数据,结果将在可用时实时流式传输到客户端。

连接到频道

可以使用以下命令建立与aschannel的连接:

$ curl \
    -X GET \
    -H "Authorization: Token <TOKEN>" \
    -d "stream=<STREAM>" \
    https://aschannel.reincubate.com/stream/

其中<TOKEN>是客户端的令牌, <STREAM>是客户端的流ID。提交任务时,此ID由asapi端点提供,并且是与客户端令牌关联的常量值。

HTTP响应代码疑难解答

连接到aschannel时 ,HTTP响应代码将在响应头中发送。下表描述了它们的含义。

状态文本描述
200 一切都好。
400 错误的请求请求格式不正确。
401 擅自客户端令牌无效。
503 暂停服务 aschannel目前无法使用:查看status.reincubate.com

解释流

来自aschannel的每条消息分为两部分:标题和有效负载。标头包含有关消息的元数据,而有效负载包含消息本身。这可以是系统消息,也可以是客户端请求的数据。标题和有效负载都以回车符和新行结束,每个都以一个数字作为前缀,表示下一段数据的长度。

<HEADER_LENGTH>\r\n
<HEADER>\r\n
<PAYLOAD_LENGTH>\r\n
<PAYLOAD>\r\n

<HEADER_LENGTH><PAYLOAD_LENGTH>都是整数,都表示下一个字段的长度, 包括 \r\n 。标头始终采用 JSON编码,但有效负载格式因其类型而异。因此,可以使用Python从aschannel使用消息,如下所示

buf = response.raw
while not buf.closed:
    header_length = buf.readline()
    header = buf.read(header_length)
    payload_length = buf.readline()
    payload = buf.read(payload_length)

    handle_payload(header, payload)

在第一次连接时,响应看起来与此类似,并且连接将保持打开状态:

66
{ "type": "system",
  "id": "cff35e74-c709-4374-348e-03ea9a2cee61"}
45
{ "message": "Message streaming has begun."}

这里, 6645分别是报头长度和有效载荷长度。从另一个终端重新连接相同的密钥将给出:

66
{ "type": "system",
  "id": "ad17c5a5-c658-4736-876e-cf7901be7292"}
155
{ "message": "A new HTTP connection has been opened using your your authentication details, culling old connections.",
  "error": "CULLING_OLD_CONNECTIONS"
}

解释分块有效载荷

任何大于几千字节的有效载荷都将被拆分成较小的块,客户端必须重新组装这些块以形成原始有效载荷。标头将包含名为chunktotal_chunkschunk_size字段。即使只有一个块,这些头也可能存在。

例如, login任务的分块结果将导致输出如下:

123
{ "chunk": 1,
  "total_chunks": 1,
  "type": "log-in",
  "task_id": "2a24db6c-6870-4a27-a45a-a5c172d8ec98",
  "chunk_size": 16384
}
51
{ "message": "Log-in successful",
  "success": true
}

接收任务结果

以下是icloud服务上fetch_data任务的消息响应示例:

137
{ "stream": "<STREAM>",
  "task_id": "<TASK_ID>",
  "chunk": 1,
  "total_chunks": 1,
  "chunk_size": 16384,
  "type": "<PAYLOAD_TYPE>"
}
288
{ "sms": [{
    "conversation_id": "+447910000000",
    "from_me": false,
    "handle": "vodafone",
    "attachments": [],
    "deleted": false,
    "type": "SMS",
    "date": "2016-09-16 11:52:53.000000",
    "text": "Welcome to Vodafone!",
    "group_handles": null,
    "id": 1
  }]
}

这里, <TASK_ID>与提交任务时asapi给出的<TASK_ID>相同。 <PAYLOAD_TYPE>描述了有效载荷的性质,并将使用asapi提交的动作的slug进行描述。

接收系统消息

有时, ricloud可能需要将系统消息传输给客户端。这些始终由标题的type字段设置为system来标识。例如:

16
{ "type": "system"
}
97
{ "code": "client-too-slow",
  "message": "The client is reading data too slowly. Disconnecting."
}

可以使用以下系统消息code值:

响应摘要
reconnect 需要重新连接
CULLING_OLD_CONNECTIONS 同时连接
WARNING_SLOW_CONNECTION 缓冲警告
CLOSING_SLOW_CONNECTION 客户太慢了

需要重新连接

通道可以指示客户端重新连接,如果是这样,客户端应该尽快打开与提供的端点的新连接。

同时连接

该通道仅允许每个客户端进行一次打开连接,如果客户端打开新连接,则旧消息将被此消息终止。

缓冲警告

客户端可能会因为无法按照提供的速率使用消息而断开连接。如果它们落后,它们将被警告,当内部缓冲区达到为其保留的消息的40%的预定义阈值时开始。

客户太慢了

当消息消耗太慢时,连接终止时发送此消息。

如果客户端有积压的任务并且很容易消耗消息,则会存在重复断开连接的风险,从而丢失许多或所有任务结果。一旦客户端重新连接,它们将仅仅是在断开连接之后已完成的任务的流式传输结果。

我们能帮你什么吗?

我们的支持团队在这里提供帮助!

我们的办公时间是格林威治标准时间周一至周五上午9点至下午5点。 时间目前是 6:28 PM的 GMT。

我们力争在一个工作日内答复所有垂询。

转到支持部分 › 联系企业团队 ›
我们的支持团队非常棒

© 2008 - 2019 Reincubate Ltd. 保留所有权利。 在英格兰和威尔士注册 #5189175, VAT GB151788978. Reincubate®是注册商标。 隐私权和条款. 我们推荐多因素认证。 在伦敦建立了爱情。