大数据处理项目相关
|
使用librdkafka开发一个producer的步骤:
一些数据结构的解释
Brokers名字的形式为:host:port; 其中port是可选的,默认是9092,host是任何一个可以解析的hostname或者ipv4或者ipv6地址。如果host是多个地址,librdkafka将会在每一次连接尝试中循环连接这些地址。包含所有broker 地址的DNS记录可以用来提供可靠的bootstrap broker。
一些函数* rd_kafka_consume_start()函数的参数: rkt: 进行consume的topic, 由前面rd_kafka_topic_new()创建 partition:进行consume的partition offset:开始consume的消息偏移。这个偏移可能是一个绝对消息偏移,或者是RD_KAKFA_OFFSET_STORED来使用存储的offset,也可能是两个特定偏移之一:RD_KAFKA_OFFSET_BEGINNING,从partition消息队列的开始进行consume;RD_KAFKA_OFFSET_END:从partition中的将要produce的下一条信息开始(忽略即当前所有的消息)。 在topic+partition的consumer启动之后,librdkafka将尝试使本地消息队列中的消息数目保持在queued.min.messages,一方反复的从broker获取消息。 本地消息队列将通过以下三种不同的consum APIs进行consume: rd_kafka_consume():每次consume一条消息 rd_kafka_consume_batch():批处理consume,一条或多条 rd_kafka_consume_callback():consume本地消息队列中的所有消息,并调用回调函数处理每条消息 上述三种方式按照性能排列的,rd_kafka_consume()是最慢的,rd_kafka_consume_callback()最快。不同的需求可以选择不同的实现方式。 一条consumed消息,由每一个consume函数提供或返回,具体是由rd_kafka_messag_t类型对象保存。</big> sparkstreaming 部分
(编辑:安卓应用网_ASP源码网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |

