This module provides the asynchronous stream interface and some of the implementations including AsyncStringStream, AsyncFileStream and AsyncSocketStream.
If you want to implement your own asynchronous stream, you must provide the
implementations of the streams operations as defined in AsyncStreamObj. Also, you can use some helpers for the absent operations, like setPositionNotImplemented, flushNop, etc.
Example:
import asyncdispatch, asyncstreams, strutils proc main {.async.} = var s = newAsyncStringStream("""Hello world!""") var res = newSeq[string]() while true: let l = await s.readLine() if l == "": break res.add(l) doAssert(res.join(", ") == "Hello, world!") waitFor main()
Types
AsyncStream* = ref AsyncStreamObj
- Reference to the asynchronous stream. Source Edit
AsyncStreamObj* = object of RootObj closeImpl*: proc (s: AsyncStream) {.
nimcall, tags: [], gcsafe.} atEndImpl*: proc (s: AsyncStream): bool {.nimcall, tags: [], gcsafe.} setPositionImpl*: proc (s: AsyncStream; pos: int64) {.nimcall, tags: [], gcsafe.} getPositionImpl*: proc (s: AsyncStream): int64 {.nimcall, tags: [], gcsafe.} readImpl*: proc (s: AsyncStream; buf: pointer; size: int): Future[int] {.nimcall, tags: [ReadIOEffect], gcsafe.} peekImpl*: proc (s: AsyncStream; buf: pointer; size: int): Future[int] {.nimcall, tags: [ReadIOEffect], gcsafe.} peekLineImpl*: proc (s: AsyncStream): Future[string] {.nimcall, tags: [ReadIOEffect], gcsafe.} writeImpl*: proc (s: AsyncStream; buf: pointer; size: int): Future[void] {.nimcall, tags: [WriteIOEffect], gcsafe.} flushImpl*: proc (s: AsyncStream): Future[void] {.nimcall, tags: [], gcsafe.}-
Asychronous stream interface. Implementation details:
- setPositionImpl can be nil, if the stream doesn't support setPosition.
- getPositionImpl can be nil, if the stream doesn't support getPosition.
- peekImpl can be nil, if the stream doesn't support peekBuffer. In that case, this operation can be emulated via getPosition and setPosition by the module itself.
- peekLineImpl is the optimized version for peekLine operation. If it's nil, then module tries to emulate peekLine if it's possible via:
- getPostion, setPosition and readLine
- peekBuffer with fixed size buffer.
- if flushImpl is nil, flush operation does nothing.
AsyncFileStream* = ref AsyncFileStreamObj
- Reference to the asynchronous file stream. Source Edit
AsyncFileStreamObj* = object of AsyncStreamObj f: AsyncFile eof: bool closed: bool
- Asynchronous file stream. Source Edit
AsyncStringStream* = ref AsyncStringStreamObj
- Reference to the asynchronous string stream. Source Edit
AsyncStringStreamObj* = object of AsyncStreamObj data: string pos: int eof: bool closed: bool
- Asynchronous string stream. Source Edit
AsyncSocketStream* = ref AsyncSocketStreamObj
- Reference to the asynchronous socket stream. Source Edit
AsyncSocketStreamObj* = object of AsyncStreamObj s: AsyncSocket closed: bool
- Asynchronous socket stream. Source Edit
AsyncBufferedStream* = ref AsyncBufferedStreamObj
- Reference to the asynchronous buffered stream. Source Edit
AsyncBufferedStreamObj* = object of AsyncStreamObj s: AsyncStream buff: seq[byte] length: int pos: int
-
Asynchronous buffered stream. Adds peekBuffer operation to the streams that doesn't support it. For example:
var s: AsyncSocketStream # This will throw the exception: var data = s.peekData(100) var bs = newAsyncBufferedStream(s) # And this won't data = bs.peekData(100)
Source Edit
Procs
proc flush*(s: AsyncStream) {.
async.}- Flushes the buffers of the stream s. Source Edit
proc close*(s: AsyncStream)
- Closes the stream s. Source Edit
proc atEnd*(s: AsyncStream): bool
- Checks if all data has been read from the stream s Source Edit
proc getPosition*(s: AsyncStream): int64
- Retrieves the current position in the stream s Source Edit
proc setPosition*(s: AsyncStream; pos: int64)
- Sets the current position in the stream s Source Edit
proc readBuffer*(s: AsyncStream; buffer: pointer; size: int): Future[int] {.
async.}- Reads up to size bytes from the stream s into the buffer Source Edit
proc peekBuffer*(s: AsyncStream; buffer: pointer; size: int): Future[int] {.
async.}- Reads up to size bytes from the stream s into the buffer without moving stream position Source Edit
proc writeBuffer*(s: AsyncStream; buffer: pointer; size: int) {.
async.}- Writes size bytes from the buffer into the stream s Source Edit
proc readData*(s: AsyncStream; size: int): Future[string] {.
async.}- Reads up to the size bytes into the string from the stream s Source Edit
proc peekData*(s: AsyncStream; size: int): Future[string] {.
async.}- Peeks up to the size bytes into the string from the stream s Source Edit
proc writeData*(s: AsyncStream; data: string) {.
async.}- Writes data to the stream s Source Edit
proc readChar*(s: AsyncStream): Future[char] {.
async.}- Reads the char from the stream s Source Edit
proc peekChar*(s: AsyncStream): Future[char] {.
async.}- Peeks the char from the stream s Source Edit
proc writeChar*(s: AsyncStream; c: char) {.
async.}- Writes the char to the stream s Source Edit
proc readLine*(s: AsyncStream): Future[string] {.
async.}- Reads the line from the stream s until end of stream or the new line delimeter Source Edit
proc peekLine*(s: AsyncStream): Future[string] {.
async.}- Peeks the line from the stream s until end of stream or the new line delimeter. It works only if the stream supports peekLine itself, allows to get/set stream position, or supports peek. Source Edit
proc writeLine*(s: AsyncStream; data: string) {.
async.}- Writes the line from the stream s followed by the new line delimeter Source Edit
proc readAll*(s: AsyncStream): Future[string] {.
async.}- Reads the data from the stream s until it's end Source Edit
proc readByte*(s: AsyncStream): Future[byte] {.
async.}- Reads byte from the stream s Source Edit
proc peekByte*(s: AsyncStream): Future[byte] {.
async.}- Peeks byte from the stream s Source Edit
proc writeByte*(s: AsyncStream; data: byte) {.
async.}- Writes byte to the stream s Source Edit
proc readInt8*(s: AsyncStream): Future[int8] {.
async.}- Reads int8 from the stream s Source Edit
proc peekInt8*(s: AsyncStream): Future[int8] {.
async.}- Peeks int8 from the stream s Source Edit
proc writeInt8*(s: AsyncStream; data: int8) {.
async.}- Writes int8 to the stream s Source Edit
proc readInt16*(s: AsyncStream): Future[int16] {.
async.}- Reads int16 from the stream s Source Edit
proc peekInt16*(s: AsyncStream): Future[int16] {.
async.}- Peeks int16 from the stream s Source Edit
proc writeInt16*(s: AsyncStream; data: int16) {.
async.}- Writes int16 to the stream s Source Edit
proc readInt32*(s: AsyncStream): Future[int32] {.
async.}- Reads int32 from the stream s Source Edit
proc peekInt32*(s: AsyncStream): Future[int32] {.
async.}- Peeks int32 from the stream s Source Edit
proc writeInt32*(s: AsyncStream; data: int32) {.
async.}- Writes int32 to the stream s Source Edit
proc readInt64*(s: AsyncStream): Future[int64] {.
async.}- Reads int64 from the stream s Source Edit
proc peekInt64*(s: AsyncStream): Future[int64] {.
async.}- Peeks int64 from the stream s Source Edit
proc writeInt64*(s: AsyncStream; data: int64) {.
async.}- Writes int64 to the stream s Source Edit
proc readUInt8*(s: AsyncStream): Future[uint8] {.
async.}- Reads uint8 from the stream s Source Edit
proc peekUInt8*(s: AsyncStream): Future[uint8] {.
async.}- Peeks uint8 from the stream s Source Edit
proc writeUInt8*(s: AsyncStream; data: uint8) {.
async.}- Writes uint8 to the stream s Source Edit
proc readUInt16*(s: AsyncStream): Future[uint16] {.
async.}- Reads uint16 from the stream s Source Edit
proc peekUInt16*(s: AsyncStream): Future[uint16] {.
async.}- Peeks uint16 from the stream s Source Edit
proc writeUInt16*(s: AsyncStream; data: uint16) {.
async.}- Writes uint16 to the stream s Source Edit
proc readUInt32*(s: AsyncStream): Future[uint32] {.
async.}- Reads uint32 from the stream s Source Edit
proc peekUInt32*(s: AsyncStream): Future[uint32] {.
async.}- Peeks uint32 from the stream s Source Edit
proc writeUInt32*(s: AsyncStream; data: uint32) {.
async.}- Writes uint32 to the stream s Source Edit
proc readUInt64*(s: AsyncStream): Future[uint64] {.
async.}- Reads uint64 from the stream s Source Edit
proc peekUInt64*(s: AsyncStream): Future[uint64] {.
async.}- Peeks uint64 from the stream s Source Edit
proc writeUInt64*(s: AsyncStream; data: uint64) {.
async.}- Writes uint64 to the stream s Source Edit
proc readInt*(s: AsyncStream): Future[int] {.
async.}- Reads int from the stream s Source Edit
proc peekInt*(s: AsyncStream): Future[int] {.
async.}- Peeks int from the stream s Source Edit
proc writeInt*(s: AsyncStream; data: int) {.
async.}- Writes int to the stream s Source Edit
proc readUInt*(s: AsyncStream): Future[uint] {.
async.}- Reads uint from the stream s Source Edit
proc peekUInt*(s: AsyncStream): Future[uint] {.
async.}- Peeks uint from the stream s Source Edit
proc writeUInt*(s: AsyncStream; data: uint) {.
async.}- Writes uint to the stream s Source Edit
proc readFloat32*(s: AsyncStream): Future[float32] {.
async.}- Reads float32 from the stream s Source Edit
proc peekFloat32*(s: AsyncStream): Future[float32] {.
async.}- Peeks float32 from the stream s Source Edit
proc writeFloat32*(s: AsyncStream; data: float32) {.
async.}- Writes float32 to the stream s Source Edit
proc readFloat64*(s: AsyncStream): Future[float64] {.
async.}- Reads float64 from the stream s Source Edit
proc peekFloat64*(s: AsyncStream): Future[float64] {.
async.}- Peeks float64 from the stream s Source Edit
proc writeFloat64*(s: AsyncStream; data: float64) {.
async.}- Writes float64 to the stream s Source Edit
proc readFloat*(s: AsyncStream): Future[float] {.
async.}- Reads float from the stream s Source Edit
proc peekFloat*(s: AsyncStream): Future[float] {.
async.}- Peeks float from the stream s Source Edit
proc writeFloat*(s: AsyncStream; data: float) {.
async.}- Writes float to the stream s Source Edit
proc readBool*(s: AsyncStream): Future[bool] {.
async.}- Reads bool from the stream s Source Edit
proc peekBool*(s: AsyncStream): Future[bool] {.
async.}- Peeks bool from the stream s Source Edit
proc writeBool*(s: AsyncStream; data: bool) {.
async.}- Writes bool to the stream s Source Edit
proc newAsyncFileStream*(fileName: string; mode = fmRead): AsyncStream
- Creates the new AsyncFileStream from the file named fileName with given mode. Source Edit
proc newAsyncFileStream*(f: AsyncFile): AsyncFileStream
- Creates the new AsyncFileStream from the AsyncFile f. Source Edit
proc `$`*(s: AsyncStringStream): string
- Converts s to string Source Edit
proc newAsyncStringStream*(data = ""): AsyncStringStream
- Creates AsyncStringStream filled with data Source Edit
proc newAsyncSocketStream*(s: AsyncSocket): AsyncSocketStream
- Creates new AsyncSocketStream from the AsyncSocket s Source Edit
proc newAsyncBufferedStream*(s: AsyncStream; buff: seq[byte]): AsyncBufferedStream
- Source Edit
proc newAsyncBufferedStream*(s: AsyncStream; buffLen = 4096): AsyncBufferedStream
- Source Edit
Templates
template wrapAsyncStream*(T: typedesc[AsyncStream]; streamFieldName: untyped): untyped
-
Copies all the operations of stream result.streamFieldName to the stream of type T. result of the calling context must point to the stream of type T.
The example of wrapping AsyncStringStream:
type WS = ref WSObj WSObj = object of AsyncStreamObj wrappedStream: AsyncStream proc newWS(s: AsyncStream): WS = new result result.wrappedStream = s wrapAsyncStream(WS, wrappedStream)
Source Edit