java - How to set offset committed by the consumer group using Spark's Direct Stream for Kafka? -


i trying use spark's direct approach (no receivers) kafka, have following kafka configuration map:

configmap.put("zookeeper.connect","192.168.51.98:2181"); configmap.put("group.id", uuid.randomuuid().tostring()); configmap.put("auto.offset.reset","smallest"); configmap.put("auto.commit.enable","true"); configmap.put("topics","ipdr31"); configmap.put("kafka.consumer.id","kafkasparkuser"); configmap.put("bootstrap.servers","192.168.50.124:9092"); 

now objective is, if spark pipeline crashes , started again, stream should started latest offset committed consumer group. so, purpose, want specify starting offset consumer. have information offsets committed in each partition. how can supply information streaming function. using

javapairinputdstream<byte[], byte[]> kafkadata =    kafkautils.createdirectstream(js, byte[].class, byte[].class,      defaultdecoder.class, defaultdecoder.class,configmap,topic);  

look @ second form of createdirectstream in spark api docs - allows pass in map<topicandpartition, long>, long offset.

note spark not automatically update offsets in zookeeper when using directinputstream - have write them either zk or other database. unless have strict requirement of exactly-once semantics, easier use createstream method dstream, in case spark update offsets in zk , resume last stored offset in case of failure.


Comments

Popular posts from this blog

toolbar - How to add link to user registration inside toobar in admin joomla 3 custom component -

linux - disk space limitation when creating war file -