Section II - executable - consuming
extern crate kafka;
extern crate clap;use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use clap::{Arg, App};
use std::path::Path;
use test_data_generation::{Profile, shared};fn make_consumer(data_topic: String) -> Result<Consumer, kafka::Error> {
Consumer::from_hosts(vec!("localhost:9092".to_owned()))
.with_topic(data_topic.to_owned())
.with_fallback_offset(FetchOffset::Earliest)
.with_group("my-group".to_owned())
.with_offset_storage(GroupOffsetStorage::Kafka)
.create()
}fn analyse_data(topic: &str, data: &str) -> Result<bool, std::io::Error>{
let profile_file = shared::string_to_static_str(format!("{}/{}", WORKSPACE_LOCAL_STORAGE, topic));
let mut profile = match Path::new(&format!("{}/{}.json", WORKSPACE_LOCAL_STORAGE, topic)).exists() {
true => {
// use existing file
Profile::from_file(&profile_file)
},
false => {
// create new file
println!("Creating new profile: {}",topic);
Profile::new_with_id(topic.to_string())
},
};
profile.analyze(&data);
profile.pre_generate();
profile.save(&profile_file)
}Last updated