rexQueue

package
v1.0.90 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ModeProducerOnly      ConsumerMode = "producer"
	ModeConsumerGroup     ConsumerMode = "group"
	ModeSimpleConsumer    ConsumerMode = "simple"
	ModePartitionConsumer ConsumerMode = "partition"

	ProducerModeSync  ProducerMode = "sync"
	ProducerModeAsync ProducerMode = "async"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerMode

type ConsumerMode string

type EasyConsumerGroupHandler

type EasyConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

func (*EasyConsumerGroupHandler) Cleanup

func (*EasyConsumerGroupHandler) ConsumeClaim

func (*EasyConsumerGroupHandler) Setup

type KafkaConfig

type KafkaConfig struct {
	ConsumerMode ConsumerMode
	ProducerMode ProducerMode
	Brokers      []string `json:",default=[localhost:29092]"`
	Topics       []string `json:",default=[]"`
	GroupId      string   `json:",default=default_group"`
	*sarama.Config
}

func Default

func Default(brokers []string, groupId string, topics []string) *KafkaConfig

func NewConfig

func NewConfig(consumerMode ConsumerMode, producerMode ProducerMode, brokers []string, groupId string, topics []string, c *sarama.Config) *KafkaConfig

func (*KafkaConfig) With

func (c *KafkaConfig) With(conf *sarama.Config) *KafkaConfig

func (*KafkaConfig) WithConsumerMode

func (c *KafkaConfig) WithConsumerMode(mode ConsumerMode) *KafkaConfig

func (*KafkaConfig) WithProducerMode

func (c *KafkaConfig) WithProducerMode(mode ProducerMode) *KafkaConfig

type KafkaQueue added in v1.0.18

type KafkaQueue interface {
	WithAsyncProducerErrFunc(asyncProducerErrFunc func(err error)) *defaultKafkaQueue
	GetASyncProducer() sarama.AsyncProducer
	GetSyncProducer() sarama.SyncProducer
	GetConsumer() sarama.Consumer
	GetConsumerGroup() sarama.ConsumerGroup
	Close() error
	CatchAsyncErr(asyncProducerErrFunc func(err error))
	Consume(ctx context.Context, consumerGroup sarama.ConsumerGroup, topics []string, consumerGroupHandler sarama.ConsumerGroupHandler)
	EasyConsume(ctx context.Context, consumerGroup sarama.ConsumerGroup, topics []string, readMsgFunc func(msg *sarama.ConsumerMessage))
	SyncSendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
	SyncSendMessageCtx(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
	EasySyncSendMessage(topic, key, value string) (partition int32, offset int64, err error)
	EasySyncSendMessageCtx(ctx context.Context, topic, key, value string) (partition int32, offset int64, err error)
	AsyncSendMessage(msg *sarama.ProducerMessage) (err error)
	AsyncSendMessageCtx(ctx context.Context, msg *sarama.ProducerMessage) (err error)
	EasyAsyncSendMessage(topic, key, value string) (err error)
	EasyAsyncSendMessageCtx(ctx context.Context, topic, key, value string) (err error)
}

func NewKafkaQueue added in v1.0.18

func NewKafkaQueue(conf *KafkaConfig) (KafkaQueue, error)

type ProducerMode

type ProducerMode string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL