Module asyncstreams

This module provides the asynchronous stream interface and some of the implementations including AsyncStringStream, AsyncFileStream and AsyncSocketStream.

If you want to implement your own asynchronous stream, you must provide the

implementations of the streams operations as defined in AsyncStreamObj. Also, you can use some helpers for the absent operations, like setPositionNotImplemented, flushNop, etc.

Example:

import asyncdispatch, asyncstreams, strutils

proc main {.async.} =
  var s = newAsyncStringStream("""Hello
world!""")
  var res = newSeq[string]()
  while true:
    let l = await s.readLine()
    if l == "":
      break
    res.add(l)
  doAssert(res.join(", ") == "Hello, world!")
waitFor main()

Types

AsyncStream* = ref AsyncStreamObj
Reference to the asynchronous stream.   Source Edit
AsyncStreamObj* = object of RootObj
  closeImpl*: proc (s: AsyncStream) {.
nimcall, tags: [], gcsafe
.} atEndImpl*: proc (s: AsyncStream): bool {.
nimcall, tags: [], gcsafe
.} setPositionImpl*: proc (s: AsyncStream; pos: int64) {.
nimcall, tags: [], gcsafe
.} getPositionImpl*: proc (s: AsyncStream): int64 {.
nimcall, tags: [], gcsafe
.} readImpl*: proc (s: AsyncStream; buf: pointer; size: int): Future[int] {.
nimcall, tags: [ReadIOEffect], gcsafe
.} peekImpl*: proc (s: AsyncStream; buf: pointer; size: int): Future[int] {.
nimcall, tags: [ReadIOEffect], gcsafe
.} peekLineImpl*: proc (s: AsyncStream): Future[string] {.
nimcall, tags: [ReadIOEffect], gcsafe
.} writeImpl*: proc (s: AsyncStream; buf: pointer; size: int): Future[void] {.
nimcall, tags: [WriteIOEffect], gcsafe
.} flushImpl*: proc (s: AsyncStream): Future[void] {.
nimcall, tags: [], gcsafe
.}
Asychronous stream interface. Implementation details:
  • setPositionImpl can be nil, if the stream doesn't support setPosition.
  • getPositionImpl can be nil, if the stream doesn't support getPosition.
  • peekImpl can be nil, if the stream doesn't support peekBuffer. In that case, this operation can be emulated via getPosition and setPosition by the module itself.
  • peekLineImpl is the optimized version for peekLine operation. If it's nil, then module tries to emulate peekLine if it's possible via:
    • getPostion, setPosition and readLine
    • peekBuffer with fixed size buffer.
  • if flushImpl is nil, flush operation does nothing.
  Source Edit
AsyncFileStream* = ref AsyncFileStreamObj
Reference to the asynchronous file stream.   Source Edit
AsyncFileStreamObj* = object of AsyncStreamObj
  f: AsyncFile
  eof: bool
  closed: bool
Asynchronous file stream.   Source Edit
AsyncStringStream* = ref AsyncStringStreamObj
Reference to the asynchronous string stream.   Source Edit
AsyncStringStreamObj* = object of AsyncStreamObj
  data: string
  pos: int
  eof: bool
  closed: bool
Asynchronous string stream.   Source Edit
AsyncSocketStream* = ref AsyncSocketStreamObj
Reference to the asynchronous socket stream.   Source Edit
AsyncSocketStreamObj* = object of AsyncStreamObj
  s: AsyncSocket
  closed: bool
Asynchronous socket stream.   Source Edit
AsyncBufferedStream* = ref AsyncBufferedStreamObj
Reference to the asynchronous buffered stream.   Source Edit
AsyncBufferedStreamObj* = object of AsyncStreamObj
  s: AsyncStream
  buff: seq[byte]
  length: int
  pos: int
Asynchronous buffered stream. Adds peekBuffer operation to the streams that doesn't support it. For example:
var s: AsyncSocketStream
# This will throw the exception:
var data = s.peekData(100)

var bs = newAsyncBufferedStream(s)
# And this won't
data = bs.peekData(100)
  Source Edit

Procs

proc flush*(s: AsyncStream) {.
async
.}
Flushes the buffers of the stream s.   Source Edit
proc close*(s: AsyncStream)
Closes the stream s.   Source Edit
proc atEnd*(s: AsyncStream): bool
Checks if all data has been read from the stream s   Source Edit
proc getPosition*(s: AsyncStream): int64
Retrieves the current position in the stream s   Source Edit
proc setPosition*(s: AsyncStream; pos: int64)
Sets the current position in the stream s   Source Edit
proc readBuffer*(s: AsyncStream; buffer: pointer; size: int): Future[int] {.
async
.}
Reads up to size bytes from the stream s into the buffer   Source Edit
proc peekBuffer*(s: AsyncStream; buffer: pointer; size: int): Future[int] {.
async
.}
Reads up to size bytes from the stream s into the buffer without moving stream position   Source Edit
proc writeBuffer*(s: AsyncStream; buffer: pointer; size: int) {.
async
.}
Writes size bytes from the buffer into the stream s   Source Edit
proc readData*(s: AsyncStream; size: int): Future[string] {.
async
.}
Reads up to the size bytes into the string from the stream s   Source Edit
proc peekData*(s: AsyncStream; size: int): Future[string] {.
async
.}
Peeks up to the size bytes into the string from the stream s   Source Edit
proc writeData*(s: AsyncStream; data: string) {.
async
.}
Writes data to the stream s   Source Edit
proc readChar*(s: AsyncStream): Future[char] {.
async
.}
Reads the char from the stream s   Source Edit
proc peekChar*(s: AsyncStream): Future[char] {.
async
.}
Peeks the char from the stream s   Source Edit
proc writeChar*(s: AsyncStream; c: char) {.
async
.}
Writes the char to the stream s   Source Edit
proc readLine*(s: AsyncStream): Future[string] {.
async
.}
Reads the line from the stream s until end of stream or the new line delimeter   Source Edit
proc peekLine*(s: AsyncStream): Future[string] {.
async
.}
Peeks the line from the stream s until end of stream or the new line delimeter. It works only if the stream supports peekLine itself, allows to get/set stream position, or supports peek.   Source Edit
proc writeLine*(s: AsyncStream; data: string) {.
async
.}
Writes the line from the stream s followed by the new line delimeter   Source Edit
proc readAll*(s: AsyncStream): Future[string] {.
async
.}
Reads the data from the stream s until it's end   Source Edit
proc readByte*(s: AsyncStream): Future[byte] {.
async
.}
Reads byte from the stream s   Source Edit
proc peekByte*(s: AsyncStream): Future[byte] {.
async
.}
Peeks byte from the stream s   Source Edit
proc writeByte*(s: AsyncStream; data: byte) {.
async
.}
Writes byte to the stream s   Source Edit
proc readInt8*(s: AsyncStream): Future[int8] {.
async
.}
Reads int8 from the stream s   Source Edit
proc peekInt8*(s: AsyncStream): Future[int8] {.
async
.}
Peeks int8 from the stream s   Source Edit
proc writeInt8*(s: AsyncStream; data: int8) {.
async
.}
Writes int8 to the stream s   Source Edit
proc readInt16*(s: AsyncStream): Future[int16] {.
async
.}
Reads int16 from the stream s   Source Edit
proc peekInt16*(s: AsyncStream): Future[int16] {.
async
.}
Peeks int16 from the stream s   Source Edit
proc writeInt16*(s: AsyncStream; data: int16) {.
async
.}
Writes int16 to the stream s   Source Edit
proc readInt32*(s: AsyncStream): Future[int32] {.
async
.}
Reads int32 from the stream s   Source Edit
proc peekInt32*(s: AsyncStream): Future[int32] {.
async
.}
Peeks int32 from the stream s   Source Edit
proc writeInt32*(s: AsyncStream; data: int32) {.
async
.}
Writes int32 to the stream s   Source Edit
proc readInt64*(s: AsyncStream): Future[int64] {.
async
.}
Reads int64 from the stream s   Source Edit
proc peekInt64*(s: AsyncStream): Future[int64] {.
async
.}
Peeks int64 from the stream s   Source Edit
proc writeInt64*(s: AsyncStream; data: int64) {.
async
.}
Writes int64 to the stream s   Source Edit
proc readUInt8*(s: AsyncStream): Future[uint8] {.
async
.}
Reads uint8 from the stream s   Source Edit
proc peekUInt8*(s: AsyncStream): Future[uint8] {.
async
.}
Peeks uint8 from the stream s   Source Edit
proc writeUInt8*(s: AsyncStream; data: uint8) {.
async
.}
Writes uint8 to the stream s   Source Edit
proc readUInt16*(s: AsyncStream): Future[uint16] {.
async
.}
Reads uint16 from the stream s   Source Edit
proc peekUInt16*(s: AsyncStream): Future[uint16] {.
async
.}
Peeks uint16 from the stream s   Source Edit
proc writeUInt16*(s: AsyncStream; data: uint16) {.
async
.}
Writes uint16 to the stream s   Source Edit
proc readUInt32*(s: AsyncStream): Future[uint32] {.
async
.}
Reads uint32 from the stream s   Source Edit
proc peekUInt32*(s: AsyncStream): Future[uint32] {.
async
.}
Peeks uint32 from the stream s   Source Edit
proc writeUInt32*(s: AsyncStream; data: uint32) {.
async
.}
Writes uint32 to the stream s   Source Edit
proc readUInt64*(s: AsyncStream): Future[uint64] {.
async
.}
Reads uint64 from the stream s   Source Edit
proc peekUInt64*(s: AsyncStream): Future[uint64] {.
async
.}
Peeks uint64 from the stream s   Source Edit
proc writeUInt64*(s: AsyncStream; data: uint64) {.
async
.}
Writes uint64 to the stream s   Source Edit
proc readInt*(s: AsyncStream): Future[int] {.
async
.}
Reads int from the stream s   Source Edit
proc peekInt*(s: AsyncStream): Future[int] {.
async
.}
Peeks int from the stream s   Source Edit
proc writeInt*(s: AsyncStream; data: int) {.
async
.}
Writes int to the stream s   Source Edit
proc readUInt*(s: AsyncStream): Future[uint] {.
async
.}
Reads uint from the stream s   Source Edit
proc peekUInt*(s: AsyncStream): Future[uint] {.
async
.}
Peeks uint from the stream s   Source Edit
proc writeUInt*(s: AsyncStream; data: uint) {.
async
.}
Writes uint to the stream s   Source Edit
proc readFloat32*(s: AsyncStream): Future[float32] {.
async
.}
Reads float32 from the stream s   Source Edit
proc peekFloat32*(s: AsyncStream): Future[float32] {.
async
.}
Peeks float32 from the stream s   Source Edit
proc writeFloat32*(s: AsyncStream; data: float32) {.
async
.}
Writes float32 to the stream s   Source Edit
proc readFloat64*(s: AsyncStream): Future[float64] {.
async
.}
Reads float64 from the stream s   Source Edit
proc peekFloat64*(s: AsyncStream): Future[float64] {.
async
.}
Peeks float64 from the stream s   Source Edit
proc writeFloat64*(s: AsyncStream; data: float64) {.
async
.}
Writes float64 to the stream s   Source Edit
proc readFloat*(s: AsyncStream): Future[float] {.
async
.}
Reads float from the stream s   Source Edit
proc peekFloat*(s: AsyncStream): Future[float] {.
async
.}
Peeks float from the stream s   Source Edit
proc writeFloat*(s: AsyncStream; data: float) {.
async
.}
Writes float to the stream s   Source Edit
proc readBool*(s: AsyncStream): Future[bool] {.
async
.}
Reads bool from the stream s   Source Edit
proc peekBool*(s: AsyncStream): Future[bool] {.
async
.}
Peeks bool from the stream s   Source Edit
proc writeBool*(s: AsyncStream; data: bool) {.
async
.}
Writes bool to the stream s   Source Edit
proc newAsyncFileStream*(fileName: string; mode = fmRead): AsyncStream
Creates the new AsyncFileStream from the file named fileName with given mode.   Source Edit
proc newAsyncFileStream*(f: AsyncFile): AsyncFileStream
Creates the new AsyncFileStream from the AsyncFile f.   Source Edit
proc `$`*(s: AsyncStringStream): string
Converts s to string   Source Edit
proc newAsyncStringStream*(data = ""): AsyncStringStream
Creates AsyncStringStream filled with data   Source Edit
proc newAsyncSocketStream*(s: AsyncSocket): AsyncSocketStream
Creates new AsyncSocketStream from the AsyncSocket s   Source Edit
proc newAsyncBufferedStream*(s: AsyncStream; buff: seq[byte]): AsyncBufferedStream
  Source Edit
proc newAsyncBufferedStream*(s: AsyncStream; buffLen = 4096): AsyncBufferedStream
  Source Edit

Templates

template wrapAsyncStream*(T: typedesc[AsyncStream]; streamFieldName: untyped): untyped

Copies all the operations of stream result.streamFieldName to the stream of type T. result of the calling context must point to the stream of type T.

The example of wrapping AsyncStringStream:

type
  WS = ref WSObj
  WSObj = object of AsyncStreamObj
    wrappedStream: AsyncStream

proc newWS(s: AsyncStream): WS =
  new result
  result.wrappedStream = s
  wrapAsyncStream(WS, wrappedStream)
  Source Edit