streamio

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: MIT Imports: 12 Imported by: 0

README

streamio

streamio Stream file processing in Go with predictable memory usage via disk-backed sessions and optional in-memory mode.

Contents

Features

  • Unified StreamReader/StreamWriter interfaces for bytes, files, multipart uploads, and download responses.
  • Auto-cleaned TempFile helper that implements io.Reader, io.Writer, io.Seeker, and can be re-opened as a reader or writer at any time.
  • Session-scoped IOManager that isolates temp directories, enforces cleanup, and lets you pick output backends (TempFile vs in-memory bytes).
  • CopyStream, Output, and helper utilities for cloning artifacts, saving to disk, or passing data down a multi-stage pipeline.

Requirements

  • Go 1.23 or newer

Installation

go get github.com/dreamph/streamio

Import the module in your project with:

import "github.com/dreamph/streamio"

Quick start

ctx := context.Background()

ioManager, err := streamio.NewIOManager("/mytemp/app")
if err != nil {
	log.Fatalf("io manager: %v", err)
}
defer ioManager.Release()

session := ioManager.NewSession(uuid.New().String())
defer session.Release()

output, err := session.Do(ctx, ".txt", func(ctx context.Context, w streamio.StreamWriter) error {
	reader := streamio.NewBytesStreamReader("payload.txt", []byte("hello world"))
	_, err := streamio.CopyStream(reader, w)
	return err
})
if err != nil {
	log.Fatalf("session.Do: %v", err)
}
defer output.Cleanup()

data, err := output.Bytes()
if err != nil {
	log.Fatalf("read output: %v", err)
}
fmt.Printf("processed %d bytes\n", len(data))

Choosing a session writer

Sessions default to disk-backed temp files. Switch to an in-memory writer for small outputs, or override per invocation:

session := ioManager.NewSession(uuid.New().String(), streamio.SessionOption{
	WriterType: streamio.OutputBytes, // or streamio.OutputTempFile
})
defer session.Release()

// Override per call (session default stays intact)
output, err := session.Do(ctx, ".zip", func(ctx context.Context, w streamio.StreamWriter) error {
	reader := streamio.NewBytesStreamReader("payload.bin", payload)
	_, err := streamio.CopyStream(reader, w)
	return err
}, streamio.SessionOption{WriterType: streamio.OutputTempFile})
if err != nil {
	log.Fatalf("session.Do override: %v", err)
}
defer output.Cleanup()

License

Streamio is distributed under the MIT License. See LICENSE for details.

Buy Me a Coffee

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Copy

func Copy(src Reader, dst Writer) (int64, error)

func CopyStream

func CopyStream(src StreamReader, dst StreamWriter) (int64, error)

---------- adapters & utilities ----------

func GenerateFileName

func GenerateFileName(fileName string) string

func NewBytesStreamWriter

func NewBytesStreamWriter() *bytesStreamWriter

NewBytesStreamWriter creates a StreamWriter that writes data into memory. Use Bytes() to retrieve the final in-memory result.

func Reset

func Reset(rs Reader) error

func WithTemp

func WithTemp(fileExtension string, fn func(*TempFile) error) (err error)

WithTemp lifecycle helper

Types

type DownloadReaderCloser

type DownloadReaderCloser interface {
	io.Reader
	io.Closer
}

func NewDownloadReaderCloser

func NewDownloadReaderCloser(streamReader StreamReader, cleanup ...func()) (DownloadReaderCloser, error)

type FileInfo

type FileInfo struct {
	Name        string
	Size        int64
	ContentType string
}

FileInfo holds basic metadata about a file or stream.

type IOManager

type IOManager interface {
	NewSession(id string, opts ...SessionOption) Session
	Release() error // cleanup the entire baseDir
}

IOManager is the root manager for all temp files.

func NewIOManager

func NewIOManager(baseDir ...string) (IOManager, error)

NewIOManager creates a base temp directory for the whole process.

type Output

type Output struct {
	// contains filtered or unexported fields
}

func Do

func Do(ctx context.Context, outputFileExtension string, doFn func(ctx context.Context, out StreamWriter) error) (*Output, error)

func (*Output) AsStreamReader

func (o *Output) AsStreamReader() StreamReader

func (*Output) Bytes

func (o *Output) Bytes() ([]byte, error)

func (*Output) Cleanup

func (o *Output) Cleanup() error

func (*Output) Clone

func (o *Output) Clone() (*Output, error)

Clone creates a copy of this output. For file-based outputs, creates a new temp file.

func (*Output) CopyTo

func (o *Output) CopyTo(dst StreamWriter) (int64, error)

CopyTo copies the output to a destination StreamWriter.

func (*Output) IsFile

func (o *Output) IsFile() bool

IsFile reports whether this output is stored as a temp file.

func (*Output) IsInMemory

func (o *Output) IsInMemory() bool

IsInMemory reports whether this output is stored in memory.

func (*Output) Keep

func (o *Output) Keep() *Output

Keep on session and will delete on process release

func (*Output) Path

func (o *Output) Path() (string, error)

func (*Output) Reader

func (o *Output) Reader() (io.ReadCloser, error)

func (*Output) SaveAs

func (o *Output) SaveAs(path string) error

SaveAs saves the output to a file path.

func (*Output) Size

func (o *Output) Size() (int64, error)

func (*Output) WriteTo

func (o *Output) WriteTo(w io.Writer) (int64, error)

type OutputType

type OutputType string

OutputType controls how session outputs are written.

const (
	// OutputTempFile stores results on disk (default).
	OutputTempFile OutputType = "tempfile"
	// OutputBytes keeps results in memory using a bytes buffer.
	OutputBytes OutputType = "bytes"
)

type Reader

type Reader interface {
	io.Reader
	io.Seeker
	io.Closer
}

Reader represents a readable stream that supports seeking and closing.

func OpenReader

func OpenReader(src StreamReader) (Reader, error)

open streamio.StreamReader

type Session

type Session interface {
	Do(ctx context.Context, outputExt string, fn func(ctx context.Context, w StreamWriter) error, opts ...SessionOption) (*Output, error)
	Release() error
}

Session represents one request/job scope and has its own subdirectory for temp files.

type SessionOption

type SessionOption struct {
	WriterType OutputType
}

SessionOption customizes the behavior of Session instances.

type StreamReader

type StreamReader interface {
	Open() (Reader, error)
	Meta() FileInfo
	Cleanup() error
}

StreamReader is a high-level abstraction for any data source that can be opened as a Reader (e.g. bytes, files, multipart uploads).

func NewBytesStreamReader

func NewBytesStreamReader(name string, data []byte) StreamReader

NewBytesStreamReader creates a StreamReader from a byte slice.

func NewFileStreamReader

func NewFileStreamReader(path string) (StreamReader, error)

NewFileStreamReader creates a StreamReader from a file path on disk.

func NewMultipartStreamReader

func NewMultipartStreamReader(h *multipart.FileHeader) StreamReader

NewMultipartStreamReader creates a StreamReader from a multipart.FileHeader (e.g. Fiber upload).

type StreamWriter

type StreamWriter interface {
	Create() (Writer, error)
}

StreamWriter is a high-level abstraction for any destination that can create a Writer for writing data (e.g. files, buffers, temp files).

func NewFileStreamWriter

func NewFileStreamWriter(path string) StreamWriter

NewFileStreamWriter creates a StreamWriter that writes data to a file path on disk.

type TempFile

type TempFile struct {
	Path string // path of the temp file on disk
	// contains filtered or unexported fields
}

TempFile - Used for temp-file backed I/O - Focus: rely on disk instead of RAM, auto cleanup, share a single handle compatible with Output.Bytes()

func NewTempFile

func NewTempFile(prefix string) (*TempFile, error)

NewTempFile creates a temp file with a random name.

func NewTempFileInDir

func NewTempFileInDir(dir, prefix, filename string) (*TempFile, error)

NewTempFileInDir creates a temp file inside the provided directory.

func NewTempFileWithName

func NewTempFileWithName(prefix, filename string) (*TempFile, error)

NewTempFileWithName creates a temp file using prefix + filename.

func (*TempFile) AsOutStream

func (t *TempFile) AsOutStream() StreamWriter

AsOutStream kept for legacy naming (alias of AsStreamWriter).

func (*TempFile) AsStreamReader

func (t *TempFile) AsStreamReader() StreamReader

AsStreamReader exposes TempFile as a StreamReader.

func (*TempFile) AsStreamWriter

func (t *TempFile) AsStreamWriter() StreamWriter

AsStreamWriter exposes TempFile as a StreamWriter.

func (*TempFile) Cleanup

func (t *TempFile) Cleanup() error

Cleanup closes the handle and deletes the file from disk.

func (*TempFile) Close

func (t *TempFile) Close() error

Close closes the current handle but keeps the file on disk.

func (*TempFile) Create

func (t *TempFile) Create() (Writer, error)

Create exposes TempFile as a StreamWriter. Always prepare a blank file before writing.

func (*TempFile) Exists

func (t *TempFile) Exists() bool

Exists reports whether the file still exists on disk.

func (*TempFile) Meta

func (t *TempFile) Meta() FileInfo

Meta returns the FileInfo of the temp file.

func (*TempFile) Open

func (t *TempFile) Open() (Reader, error)

Open exposes TempFile as a StreamReader. Returns a Reader that supports Seek and Close.

func (*TempFile) Read

func (t *TempFile) Read(p []byte) (int, error)

Read implements io.Reader.

func (*TempFile) Seek

func (t *TempFile) Seek(offset int64, whence int) (int64, error)

Seek implements io.Seeker.

func (*TempFile) Size

func (t *TempFile) Size() int64

Size returns the file size on disk (bytes).

func (*TempFile) Write

func (t *TempFile) Write(p []byte) (int, error)

Write implements io.Writer.

type Writer

type Writer interface {
	io.Writer
	io.Closer
}

Writer represents a writable stream that supports closing.

func OpenWriter

func OpenWriter(dest StreamWriter) (Writer, error)

open streamio.StreamWriter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL