描述:
连接kafka server服务器。
语法:
kafka_subscribe (prop:value,....;topic,...;keyType, valueType)
kafka_subscribe (filename或fileObject; topic,...; keyType, valueType)
备注:
连接server服务器, 将属性参数properties按k:v形式输入,也可以将属性参数properties存放到以后缀名为”. properties”的配置文件中。
参数:
prop:value |
properties属性参数,以k:v形式输入,其属性参数有多个,可参考kafka相关文档 |
topic |
查询具体的某个topic或多个topic |
keyType |
consumer的key数据类型,缺省为string, 支持类型有int, interger, float, long, string |
filename |
以后缀名为”. properties”的属性参数文件 |
fileObject |
file文件对象 |
valueType |
consumer的key数据类型,缺省为string, 支持类型有int, interger,float, long, string |
返回值:
consumer对象
示例:
|
A |
|
1 |
=kafka_subscribe("D:/kafka_string.properties";"test1";"String","byte[]") |
用topic为test1,key为String, value为byte[]的配置文件kafka_string.propertie连接kafka server |
2 |
=kafka_subscribe(file("D:/kafka_string.properties");"test1";"String","byte[]") |
以配置文件对象方式连接kafka server,其余同上 |
3 |
=kafka_subscribe("bootstrap.servers":"192.168.0.144:9092","zookeeper.connect":"192.168.0.144:2181", "group.id":"test","zookeeper.session.timeout.ms":"4000","zookeeper.sync.time.ms":"200", "auto.commit.interval.ms":"500","auto.offset.reset":"latest", "serializer.class":"kafka.serializer.StringEncoder", "key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer"; "test1";"String","byte[]") |
以参数方式连接kafaka server, 其余同上 |