tok

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2025 License: MIT Imports: 17 Imported by: 6

README

tok

"talk", a library to simplify creating IM application

Installation

go get github.com/quexer/tok

Features

  • Supports both TCP and WebSocket servers for flexible IM application deployment.
  • Modular design: core logic is decoupled from network adapters, making it easy to extend or customize.
  • Simple API for creating hubs, managing connections, and handling messages.
  • Built-in memory queue for offline message caching, with pluggable queue interface.
  • Supports single sign-on (SSO) to ensure only one active connection per user.
  • Configurable timeouts for authentication, server ping, and message reading/writing.
  • Easy integration with custom authentication logic.
  • Cluster support available via quexer/cluster.
  • Graceful connection lifecycle management with context-based cancellation.

WebSocket Engine Support

tok supports multiple WebSocket engines for flexible integration:

  • golang.org/x/net/websocket (default)
  • github.com/gorilla/websocket
  • github.com/coder/websocket ( former nhooyr.io/websocket)

You can select the engine via configuration options. Future engines can be added easily.

Architecture

graph TB
    subgraph "Client Applications"
        C1[TCP Client]
        C2[WebSocket Client]
    end
    
    subgraph "Network Layer"
        TCP[TCP Server<br/>tcp_conn.go]
        WS[WebSocket Server<br/>ws_conn.go]
        
        subgraph "WebSocket Engines"
            WSX[x/net/websocket<br/>ws_x.go]
            WSG[gorilla/websocket<br/>ws_gorilla.go]
            WSC[coder/websocket<br/>ws_coder.go]
        end
    end
    
    subgraph "Core Layer"
        Hub[Hub<br/>hub.go]
        HubConfig[Hub Config<br/>hub_config.go]
        Actor[Actor Interface<br/>tok.go]
        Device[Device<br/>device.go]
    end
    
    subgraph "Message Queue"
        Queue[Queue Interface<br/>q.go]
        MemQ[Memory Queue<br/>memory_q.go]
    end
    
    subgraph "Handlers"
        BR[BeforeReceiveHandler]
        BS[BeforeSendHandler]
        AS[AfterSendHandler]
        CH[CloseHandler]
        PG[PingGenerator]
        BG[ByeGenerator]
    end
    
    C1 -->|TCP| TCP
    C2 -->|WebSocket| WS
    
    WS --> WSX
    WS --> WSG
    WS --> WSC
    
    TCP --> Hub
    WSX --> Hub
    WSG --> Hub
    WSC --> Hub
    
    Hub --> Actor
    Hub --> Device
    Hub --> Queue
    Hub --> HubConfig
    
    Queue -.->|implements| MemQ
    
    Actor -.->|optional| BR
    Actor -.->|optional| BS
    Actor -.->|optional| AS
    Actor -.->|optional| CH
    Actor -.->|optional| PG
    Actor -.->|optional| BG
    
    style Hub fill:#e74c3c,stroke:#333,stroke-width:4px,color:#fff
    style Actor fill:#3498db,stroke:#333,stroke-width:2px,color:#fff

Structure

  • tok.go : Entry and core types for the library.
  • hub.go : Hub logic for managing connections and message dispatch.
  • hub_config.go : Hub configuration and options.
  • tcp_conn.go : TCP server and adapter implementation.
  • ws_conn.go : WebSocket server implementation supporting multiple engines.
  • ws_gorilla.go : github.com/gorilla/websocket adapter.
  • ws_x.go : golang.org/x/net/websocket adapter.
  • ws_coder.go : github.com/coder/websocket adapter.
  • ws_option.go : WebSocket engine selection and options.
  • memory_q.go : Built-in in-memory message queue for offline messages.
  • device.go : Device abstraction for user device.
  • example/ : Example server and client implementations. See examples

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCacheFailed = errors.New("tok: cache error")

ErrCacheFailed occurs while sending "cacheable" message with queue but failed to cache

View Source
var ErrOffline = errors.New("tok: offline")

ErrOffline occurs while sending message to online user only. see Hub.Send

View Source
var ErrQueueRequired = errors.New("tok: queue is required")

ErrQueueRequired occurs while sending "cacheable" message without queue

View Source
var (
	// TCPMaxPackLen upper limit for single message
	TCPMaxPackLen uint32 = 4 * 1024 * 1024
)

Functions

This section is empty.

Types

type Actor

type Actor interface {
	// OnReceive is called whenever the server receives a valid payload.
	// dv represents the sender the data, data is the received byte slice.
	OnReceive(dv *Device, data []byte)
}

Actor interface is used to handle valid payloads received by the server.

type AfterSendHandler added in v1.0.5

type AfterSendHandler interface {
	// AfterSend is called after data has been sent to a device
	AfterSend(dv *Device, data []byte)
}

AfterSendHandler is an interface for handling events after sending data

type BeforeReceiveHandler added in v1.0.5

type BeforeReceiveHandler interface {
	// BeforeReceive is called to preprocess incoming data before OnReceive
	BeforeReceive(dv *Device, data []byte) ([]byte, error)
}

BeforeReceiveHandler is an interface for preprocessing incoming data before OnReceive

type BeforeSendHandler added in v1.0.5

type BeforeSendHandler interface {
	// BeforeSend is called to preprocess outgoing data before sending
	BeforeSend(dv *Device, data []byte) ([]byte, error)
}

BeforeSendHandler is an interface for preprocessing outgoing data before sending

type ByeGenerator added in v1.0.5

type ByeGenerator interface {
	// Bye builds the payload to notify clients before a connection is closed for a specific reason.
	// kicker is the device that initiated the kick, reason is the reason for the kick, dv is the device being kicked.
	Bye(kicker *Device, reason string, dv *Device) []byte
}

ByeGenerator is an interface for generating bye payloads

type CloseHandler added in v1.0.5

type CloseHandler interface {
	// OnClose is called after a connection has been closed
	OnClose(dv *Device)
}

CloseHandler is an interface for handling connection close events

type ConAdapter added in v1.1.0

type ConAdapter interface {
	// Read reads the next message from the connection.
	// Read should block until a message is available or an error occurs.
	// The returned data should be the complete message payload (not including any protocol framing).
	Read() ([]byte, error)

	// Write writes a message to the connection.
	// Write must be thread-safe as it may be called concurrently.
	// The data parameter is the complete message payload to send.
	Write(data []byte) error

	// Close closes the connection.
	// After Close is called, all Read and Write operations should return errors.
	Close() error

	// ShareConn returns true if this adapter shares the same underlying connection with another adapter.
	// This is used for connection deduplication in SSO (Single Sign-On) mode.
	ShareConn(adapter ConAdapter) bool
}

ConAdapter is the adapter interface for real connections. Users can implement this interface to support custom connection types beyond the built-in TCP and WebSocket.

Implementations must be thread-safe for Write operations, as Write may be called concurrently. Read operations are called sequentially from a single goroutine.

type Device

type Device struct {
	// contains filtered or unexported fields
}

Device device struct

func CreateDevice

func CreateDevice(uid interface{}, id string) *Device

CreateDevice uid is user id, id is uuid of this device(could be empty)

func (*Device) GetMeta

func (p *Device) GetMeta(key string) string

GetMeta return device meta

func (*Device) ID

func (p *Device) ID() string

ID return device uuid(could be empty)

func (*Device) PutMeta

func (p *Device) PutMeta(key string, val string)

PutMeta set device meta

func (*Device) UID

func (p *Device) UID() interface{}

UID return user id

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub core of tok, dispatch message between connections

func CreateWsHandler

func CreateWsHandler(auth WsAuthFunc, opts ...WsHandlerOption) (*Hub, http.Handler)

CreateWsHandler create websocket http handler auth function is used for user authorization Return hub and http handler

func Listen

func Listen(hub *Hub, config *HubConfig, addr string, auth TCPAuthFunc) (*Hub, error)

Listen create Tcp listener with hub. If config is not nil, a new hub will be created and replace the old one. addr is the tcp address to be listened on. auth function is used for user authorization return error if listen failed.

func (*Hub) CheckOnline

func (p *Hub) CheckOnline(ctx context.Context, uid interface{}) bool

CheckOnline return whether user online or not

func (*Hub) Kick

func (p *Hub) Kick(ctx context.Context, uid interface{})

Kick all connections of uid

func (*Hub) Online

func (p *Hub) Online(ctx context.Context) []interface{}

Online query online user list

func (*Hub) RegisterConnection added in v1.1.0

func (p *Hub) RegisterConnection(ctx context.Context, dv *Device, adapter ConAdapter)

RegisterConnection registers a custom connection with the hub. This method allows users to integrate their own connection types (e.g., QUIC, Unix sockets) by implementing the ConAdapter interface.

Parameters:

  • dv: The authenticated device information
  • adapter: The connection adapter implementing the ConAdapter interface

The connection will be managed by the hub and will receive messages sent to the device. The hub will handle connection lifecycle, including ping/pong if configured.

Example:

adapter := &MyCustomAdapter{conn: customConn}
device := tok.CreateDevice("user123", "session456")
hub.RegisterConnection(device, adapter)

func (*Hub) Send

func (p *Hub) Send(ctx context.Context, to interface{}, b []byte, ttl uint32) error

Send message to someone. ttl is expiry seconds. 0 means only send to online user If ttl = 0 and user is offline, ErrOffline will be returned. If ttl > 0 and user is offline or online but send fail, message will be cached for ttl seconds.

type HubConfig

type HubConfig struct {
	// contains filtered or unexported fields
}

HubConfig config struct for creating new Hub

func NewHubConfig added in v1.0.3

func NewHubConfig(actor Actor, opts ...HubConfigOption) *HubConfig

NewHubConfig create new HubConfig

type HubConfigOption added in v1.0.3

type HubConfigOption func(*HubConfig)

func WithHubConfigAfterSend added in v1.0.5

func WithHubConfigAfterSend(afterSend AfterSendHandler) HubConfigOption

WithHubConfigAfterSend set optional AfterSend handler for hub config.

func WithHubConfigAuthTimeout added in v1.0.4

func WithHubConfigAuthTimeout(timeout time.Duration) HubConfigOption

WithHubConfigAuthTimeout set auth timeout for hub config, default is 5 seconds.

func WithHubConfigBeforeReceive added in v1.0.5

func WithHubConfigBeforeReceive(hdl BeforeReceiveHandler) HubConfigOption

WithHubConfigBeforeReceive set optional BeforeReceive handler for hub config.

func WithHubConfigBeforeSend added in v1.0.5

func WithHubConfigBeforeSend(beforeSend BeforeSendHandler) HubConfigOption

WithHubConfigBeforeSend set optional BeforeSend handler for hub config.

func WithHubConfigByeGenerator added in v1.0.5

func WithHubConfigByeGenerator(byeGenerator ByeGenerator) HubConfigOption

WithHubConfigByeGenerator set optional ByeGenerator for hub config to enable bye message generation. if this is not set, no bye messages will be sent when closing connections.

func WithHubConfigCloseHandler added in v1.0.5

func WithHubConfigCloseHandler(closeHandler CloseHandler) HubConfigOption

WithHubConfigCloseHandler set optional CloseHandler for hub config.

func WithHubConfigPingProducer added in v1.0.5

func WithHubConfigPingProducer(pingProducer PingGenerator) HubConfigOption

WithHubConfigPingProducer set optional PingGenerator for hub config to enable auto-ping feature. if this is not set, server-side auto-ping feature is disabled.

func WithHubConfigQueue added in v1.0.3

func WithHubConfigQueue(q Queue) HubConfigOption

WithHubConfigQueue set queue for hub config. default is MemoryQueue

func WithHubConfigReadTimeout added in v1.0.4

func WithHubConfigReadTimeout(timeout time.Duration) HubConfigOption

WithHubConfigReadTimeout set read timeout for hub config, default is 0 seconds, means no read timeout.

func WithHubConfigServerPingInterval added in v1.0.4

func WithHubConfigServerPingInterval(interval time.Duration) HubConfigOption

WithHubConfigServerPingInterval set ping interval for hub config, default is 30 seconds.

func WithHubConfigSso added in v1.0.3

func WithHubConfigSso(sso bool) HubConfigOption

WithHubConfigSso set sso for hub config. default is true

func WithHubConfigWriteTimeout added in v1.0.4

func WithHubConfigWriteTimeout(timeout time.Duration) HubConfigOption

WithHubConfigWriteTimeout set write timeout for hub config, default is 1 minute.

type MemoryQueue added in v1.0.3

type MemoryQueue struct {
	// contains filtered or unexported fields
}

func NewMemoryQueue added in v1.0.3

func NewMemoryQueue() *MemoryQueue

func (*MemoryQueue) Close added in v1.1.0

func (mq *MemoryQueue) Close()

Close stops the cleanup routine

func (*MemoryQueue) Deq added in v1.0.3

func (mq *MemoryQueue) Deq(ctx context.Context, uid interface{}) ([]byte, error)

func (*MemoryQueue) Enq added in v1.0.3

func (mq *MemoryQueue) Enq(ctx context.Context, uid interface{}, data []byte, ttl ...uint32) error

func (*MemoryQueue) Len added in v1.0.3

func (mq *MemoryQueue) Len(ctx context.Context, uid interface{}) (int, error)

type PingGenerator added in v1.0.5

type PingGenerator interface {
	// Ping generate server-side ping payload
	Ping() []byte
}

PingGenerator is an interface for generating server-side ping payloads

type Queue

type Queue interface {
	Enq(ctx context.Context, uid interface{}, data []byte, ttl ...uint32) error
	Deq(ctx context.Context, uid interface{}) ([]byte, error)
	Len(ctx context.Context, uid interface{}) (int, error)
}

Queue is FIFO queue interface, used by Hub

type TCPAuthFunc

type TCPAuthFunc func([]byte) (*Device, error)

TCPAuthFunc tcp auth function parameter is the first package content of connection. return Device interface

type WsAuthFunc

type WsAuthFunc func(*http.Request) (*Device, error)

WsAuthFunc websocket auth function, return Device interface parameter is the initial websocket request

type WsEngine added in v1.0.6

type WsEngine int

WsEngine represents different WebSocket engine implementations

const (
	// WsEngineX uses golang.org/x/net/websocket (default)
	WsEngineX WsEngine = iota
	// WsEngineGorilla uses github.com/gorilla/websocket
	WsEngineGorilla
	// WsEngineCoder uses github.com/coder/websocket (former nhooyr.io/websocket)
	WsEngineCoder
)

type WsHandler added in v1.0.3

type WsHandler struct {
	// contains filtered or unexported fields
}

type WsHandlerOption added in v1.0.3

type WsHandlerOption func(*WsHandler)

func WithWsHandlerEngine added in v1.0.6

func WithWsHandlerEngine(engine WsEngine) WsHandlerOption

WithWsHandlerEngine sets the websocket engine for ws handler

func WithWsHandlerHub added in v1.0.3

func WithWsHandlerHub(hub *Hub) WsHandlerOption

WithWsHandlerHub set hub for ws handler, if hubConfig is nil, hub will be used

func WithWsHandlerHubConfig added in v1.0.3

func WithWsHandlerHubConfig(hc *HubConfig) WsHandlerOption

WithWsHandlerHubConfig set hub config for ws handler

func WithWsHandlerTxt added in v1.0.3

func WithWsHandlerTxt(txt bool) WsHandlerOption

WithWsHandlerTxt set txt mode for ws handler

Directories

Path Synopsis
example
coder_client command
coder_server command
gorilla_client command
gorilla_server command
xws_client command
xws_server command
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL