Documentation
¶
Index ¶
Constants ¶
View Source
const ( ModeProducerOnly ConsumerMode = "producer" ModeConsumerGroup ConsumerMode = "group" ModeSimpleConsumer ConsumerMode = "simple" ModePartitionConsumer ConsumerMode = "partition" ProducerModeSync ProducerMode = "sync" ProducerModeAsync ProducerMode = "async" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerMode ¶
type ConsumerMode string
type EasyConsumerGroupHandler ¶
type EasyConsumerGroupHandler struct {
// contains filtered or unexported fields
}
func (*EasyConsumerGroupHandler) Cleanup ¶
func (h *EasyConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
func (*EasyConsumerGroupHandler) ConsumeClaim ¶
func (h *EasyConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*EasyConsumerGroupHandler) Setup ¶
func (h *EasyConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
type KafkaConfig ¶
type KafkaConfig struct {
ConsumerMode ConsumerMode
ProducerMode ProducerMode
Brokers []string `json:",default=[localhost:29092]"`
Topics []string `json:",default=[]"`
GroupId string `json:",default=default_group"`
*sarama.Config
}
func NewConfig ¶
func NewConfig(consumerMode ConsumerMode, producerMode ProducerMode, brokers []string, groupId string, topics []string, c *sarama.Config) *KafkaConfig
func (*KafkaConfig) With ¶
func (c *KafkaConfig) With(conf *sarama.Config) *KafkaConfig
func (*KafkaConfig) WithConsumerMode ¶
func (c *KafkaConfig) WithConsumerMode(mode ConsumerMode) *KafkaConfig
func (*KafkaConfig) WithProducerMode ¶
func (c *KafkaConfig) WithProducerMode(mode ProducerMode) *KafkaConfig
type KafkaQueue ¶ added in v1.0.18
type KafkaQueue interface {
WithAsyncProducerErrFunc(asyncProducerErrFunc func(err error)) *defaultKafkaQueue
GetASyncProducer() sarama.AsyncProducer
GetSyncProducer() sarama.SyncProducer
GetConsumer() sarama.Consumer
GetConsumerGroup() sarama.ConsumerGroup
Close() error
CatchAsyncErr(asyncProducerErrFunc func(err error))
Consume(ctx context.Context, consumerGroup sarama.ConsumerGroup, topics []string, consumerGroupHandler sarama.ConsumerGroupHandler)
EasyConsume(ctx context.Context, consumerGroup sarama.ConsumerGroup, topics []string, readMsgFunc func(msg *sarama.ConsumerMessage))
SyncSendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
SyncSendMessageCtx(ctx context.Context, msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
EasySyncSendMessage(topic, key, value string) (partition int32, offset int64, err error)
EasySyncSendMessageCtx(ctx context.Context, topic, key, value string) (partition int32, offset int64, err error)
AsyncSendMessage(msg *sarama.ProducerMessage) (err error)
AsyncSendMessageCtx(ctx context.Context, msg *sarama.ProducerMessage) (err error)
EasyAsyncSendMessage(topic, key, value string) (err error)
EasyAsyncSendMessageCtx(ctx context.Context, topic, key, value string) (err error)
}
func NewKafkaQueue ¶ added in v1.0.18
func NewKafkaQueue(conf *KafkaConfig) (KafkaQueue, error)
type ProducerMode ¶
type ProducerMode string
Click to show internal directories.
Click to hide internal directories.