We are now ready to write the executable. To do this, we will working with the following files:
Cargo.toml (manifest)
src/bin/daas-status-processing.rs (executable)
Tests
Code
Declaring the Executable
In the Cargo.toml file, after the that last [[bin]] statement for sourcing, add the following declaration.
[[bin]]
name = "status_processor"
path = "src/bin/daas-status-processing.rs"
This will tell Cargo that there is a binary file to be compiled and to name the exeutable status_processor.
Coding the Executable
Let's begin by creating a file named daas-status-processing.rs in the src/bin/ directory.
At the top of the file, we will start be declare the dependent crate with macros and the use statements.
#[macro_use] extern crate serde_derive;
use daas::{DELIMITER};
use daas::daas::{DaaSDoc};
use daas::processor::{OrderStatusProcessor};
use daas::couchdb::{CouchDB};
use serde_json::{json, Value};
use daas::KAFKA_BROKERS;
We then define the module variables for this executable.
Next we need to provide the supportive function to handle updating the status history.
fn update_order_status_history(order: Value) -> Result<bool, String>{
let category = "history".to_string();
let subcategory = "status".to_string();
let id = daas::daas::make_id(category.clone(), subcategory.clone(), order["source_name"].as_str().unwrap().to_string(), order["source_uid"].as_u64().unwrap() as usize);
let couch = CouchDB::new(DB_USER.to_string(), DB_PSWRD.to_string());
let status = StatusRecord{
name: order["data_obj"]["status"].as_str().unwrap().to_string(),
timestamp: order["last_updated"].as_u64().unwrap(),
};
match couch.get_doc_by_id(DB_NAME.to_string(), id) {
Ok(mut doc) => {
//update the status history of an existing document
let status_history = doc.data_obj_as_ref()["order_status"].as_array_mut().unwrap();
status_history.push(json!(status));
couch.upsert_doc(DB_NAME.to_string(), doc).unwrap();
Ok(true)
},
_ => {
//create a new status history document
let hist = json!({
"order_number": order["source_uid"],
"store_name": order["source_name"],
"order_type": order["subcategory"],
"timestamp": order["last_updated"],
"order_status": [status],
});
let uid = order["source_uid"].as_u64().unwrap() as usize;
let src_nme = order["source_name"].as_str().unwrap().to_string();
let athr = order["author"].as_str().unwrap().to_string();
let doc = DaaSDoc::new(src_nme, uid, category, subcategory, athr, hist);
couch.upsert_doc(DB_NAME.to_string(), doc).unwrap();
Ok(true)
}
}
}
Now, we are ready to writ ethe main() function that will be called when the executable starts.
pub fn main() {
let topic = format!("{}{}{}{}{}", "order", DELIMITER, "clothing", DELIMITER, "iStore");
let mut status_processor = OrderStatusProcessor::new(KAFKA_BROKERS.to_string(), topic, "my-group".to_string());
status_processor.start_listening(|msg|{
let value = String::from_utf8(msg.value.to_vec()).unwrap();
let order: Value = serde_json::from_str(&value).unwrap();
match update_order_status_history(order.clone()) {
Ok(_v) => println!("processed order status history for {} order number {}.", order["source_name"], order["source_uid"]),
Err(err) => println!("Couldn't process order status history for {} order number {} because of {}!", order["source_name"], order["source_uid"], err),
}
});
}
IMPORTANT
You will need to create a database named provisioning in CouchDB in order for this service to work.
You will need to create a topic named test in the Kafka broker in order for this unit tests to pass.
Try to rerun the cargo test command, and ensure that all the test pass.
Running the Microsservice
On the command line, run the cargo run --bin status_processor command to start the service. (Or open a new command terminal and start the service using the executable in the target/debug directory).
PS C:\workspace\rust-daas> cargo run --bin status_processor
Finished dev [unoptimized + debuginfo] target(s) in 0.42s
Running `target\debug\status_processor.exe`
processed order status history for "iStore" order number 8003.
processed order status history for "iStore" order number 8003.
processed order status history for "iStore" order number 8004.
``
Look in the CouchDB `provisioning` database and there should be documents for the aggregates by order number.
_Example_