Monday 4 December 2017

Kafka Automation

Kafka consumer client automation - JAVA

As the tech is moving towards micro services, integration is one of the key prone areas for major failures. Having said that this architecture is building lot of flexibility in development space.

Majoity consumers of these services are in B2B model, and few customer facing but again depends on product.

Below is the architecture of how services are consumed and used for other streams.






Providers basically keeps pushing data to kafka and consumers keep polling kafka for any updates and then utilize the same.

How to do automate this ?
First thing is you cannot read it as buffer stream, as our code doesn't know EOL hence keeps polling and doesn't terminate, one has to manually terminate the connection.

Hence we'll have to use kafka client jars to capture the notifications, below is the sample code

Jar: org.apache.kafka.clients.consumer

Properties props = new Properties();
             props.put("bootstrap.servers","localhost:9092");
             props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG"earliest");
              props.put("group.id", UUID.randomUUID().toString());
             props.put("consumer.timeout.ms",2000);
             props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("topic"));
try{
while(flag){
ConsumerRecords<String, String> recordsconsumer.poll(1000);
            flag=false;        
for (TopicPartition partition :records.partitions()) {
List<ConsumerRecord<String, String>>partitionRecords =records.records(partition);
                           for(ConsumerRecord<String, String> record :partitionRecords) {
record.offset() 
flag=true;
}
}
counter++;
                     if(counter!=20 && !flag){
                           Thread.sleep(1000);
                     }if(counter==20){
                           break;
                     }
}
catch(){}

The above snippet basically reads notifiers and when it has reached end, the exit criteria is poll for timer/boolean and break the loop.

No comments:

Post a Comment