Streaming data with aschannel (v2)

Updated

aschannel is a streaming endpoint which allows clients to retrieve the results of each submitted task -- in real-time -- as they are generated. Streaming APIs differ from regular APIs in that a connection is held open indefinitely and messages are sent to the client immediately on becoming available.

Clients subscribe to one or more accounts using asmaster, or request data directly via asapi, and results will be streamed to the client in real-time once available.

Connecting to the channel

A connection to aschannel can be made using the following command:

$ curl \
    -X GET \
    -H "Authorization: Token <TOKEN>" \
    -d "stream=<STREAM>" \
    https://aschannel.reincubate.com/stream/

Where <TOKEN> is the client's token, and <STREAM> the client's stream ID. This ID is provided by the asapi endpoint when a task is submitted, and it is a constant value tied to a client's token.

Troubleshooting HTTP response codes

When connecting to aschannel an HTTP response code will be sent in the response header. The table below describes their meanings.

Status Text Description
200 OK Everything is fine.
400 Bad Request The request is malformed.
401 Unauthorized Client token is invalid.
503 Service Unavailable aschannel is currently unavailable: check status.reincubate.com

Interpreting the stream

Each message from aschannel is in two parts: a header and a payload. The header contains metadata about the message, while the payload contains the message itself. This could be a system message, or the data that the client has requested. Both the header and the payload end in a carriage return and a new line, and each is prefixed by a number indicating the length of the following piece of data.

<HEADER_LENGTH>\r\n
<HEADER>\r\n
<PAYLOAD_LENGTH>\r\n
<PAYLOAD>\r\n

Both <HEADER_LENGTH> and <PAYLOAD_LENGTH> are integers and both represent the length of the next field including the \r\n. The header is always JSON encoded, but the payload format varies depending on its type. Consequently, one could consume messages from the aschannel with Python like so:

buf = response.raw
while not buf.closed:
    header_length = buf.readline()
    header = buf.read(header_length)
    payload_length = buf.readline()
    payload = buf.read(payload_length)

    handle_payload(header, payload)

On first connecting, the response will look similar to this, and the connection will stay open:

66
{ "type": "system",
  "id": "cff35e74-c709-4374-348e-03ea9a2cee61"}
45
{ "message": "Message streaming has begun."}

Here, 66 and 45 are the header length and payload length respectively. Reconnecting with the same key from another terminal will give this:

66
{ "type": "system",
  "id": "ad17c5a5-c658-4736-876e-cf7901be7292"}
155
{ "message": "A new HTTP connection has been opened using your your authentication details, culling old connections.",
  "error": "CULLING_OLD_CONNECTIONS"
}

Interpreting chunked payloads

Any payload that is larger than a few kilobytes will be split into smaller chunks, which a client must reassemble to form the original payload. Headers will contain fields named chunk, total_chunks, and chunk_size. These headers may be present even when there is only a single chunk.

As an example, a chunked result for a login task would result in an output as below:

123
{ "chunk": 1,
  "total_chunks": 1,
  "type": "log-in",
  "task_id": "2a24db6c-6870-4a27-a45a-a5c172d8ec98",
  "chunk_size": 16384
}
51
{ "message": "Log-in successful",
  "success": true
}

Receiving task results

The following is an example of a message response from a fetch_data task on the icloud service:

137
{ "stream": "<STREAM>",
  "task_id": "<TASK_ID>",
  "chunk": 1,
  "total_chunks": 1,
  "chunk_size": 16384,
  "type": "<PAYLOAD_TYPE>"
}
288
{ "sms": [{
    "conversation_id": "+447910000000",
    "from_me": false,
    "handle": "vodafone",
    "attachments": [],
    "deleted": false,
    "type": "SMS",
    "date": "2016-09-16 11:52:53.000000",
    "text": "Welcome to Vodafone!",
    "group_handles": null,
    "id": 1
  }]
}

Here, the <TASK_ID> is the same <TASK_ID> given by asapi when the task was submitted. The <PAYLOAD_TYPE> describes the nature of the payload, and will be described using the slug of the action submitted via asapi.

Receiving system messages

From time to time, ricloud may need to transmit system messages to the client. These are always identified by the header's type field being set to system. For example:

16
{ "type": "system"
}
97
{ "code": "client-too-slow",
  "message": "The client is reading data too slowly. Disconnecting."
}

The following system message code values are possible:

Response Summary
reconnect Reconnection required
CULLING_OLD_CONNECTIONS Simultaneous connection
WARNING_SLOW_CONNECTION Buffer warning
CLOSING_SLOW_CONNECTION Client too slow

Reconnection required

The channel may instruct the client to reconnect, if so the client should open a new connection to the supplied endpoint as soon as possible.

Simultaneous connection

The channel only permits a single open connection per client, if a client opens a new connection the old one will be killed with this message.

Buffer warning

Clients can be disconnected for failing to consume messages at the rate they are being provided. They will be warned if they are falling behind, starting when the internal buffer reaches a predefined threshold of 40% of their messages being held for them.

Client too slow

This message is sent as the connection is terminated when consumption of messages is too slow.

If the client has a backlog of tasks and does consume messages readily they run the risk of being repeatedly disconnected, and thus losing many or all task results. Once a client reconnects, they will only be streamed results from tasks that have completed subsequent to the disconnection.

How can we help?

Our support team are here to help!

Our office hours are Monday to Friday, 9 AM to 5 PM GMT. The time is currently 2:52 AM GMT.

We aim to reply to all messages within one working day.

Go to support section › Contact the enterprise team ›
Our awesome support team

© 2008 - 2019 Reincubate Ltd. All rights reserved. Registered in England and Wales #5189175, VAT GB151788978. Reincubate® is a registered trademark. Privacy & terms. We recommend 2FA. Built with in London.