Blob not found
Hi, 👋
Using the code below, I can consistently reproduce Blob not found.
use std::str::FromStr;
use iroh::{
base::base32,
client::{docs::Entry, Doc, Iroh},
docs::{store::Query, NamespaceSecret},
node::Node,
};
use clap::{Parser, Subcommand};
use indicatif::HumanBytes;
use rand::Rng;
use tokio_stream::StreamExt;
pub static GLOBAL_NAMESPACE: &str = "q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
run().await?;
Ok(())
}
async fn run() -> anyhow::Result<()> {
let opts = Opts::parse();
let storage_path = std::env::current_dir().unwrap().join("data");
tokio::fs::create_dir_all(&storage_path).await?;
// Initialize node
let node = Node::persistent(&storage_path).await?.spawn().await?;
let client = node.client();
// Get author, `authors().default()` doesn't work
let author_id = if let Some(first_author) = node.client().authors().list().await?.next().await {
first_author?
} else {
client.authors().create().await?
};
match opts.cmd.as_ref() {
Some(Command::Add { description }) => {
let document = get_document(client).await?;
let todo_id = gen_id().to_string();
println!("- [ ] {}: {}", &todo_id, &description);
// Write
document
.set_bytes(author_id, todo_id, description.to_owned())
.await?;
let document_id = document.id().to_string();
println!("\ndocument id: {document_id}");
println!("author id: {author_id}");
}
None => {
let document = get_document(client).await?;
let document_id = document.id().to_string();
println!("::: Listing entry for document_id: {document_id}");
let mut stream = document
.get_many(Query::single_latest_per_key())
.await
.unwrap();
while let Some(entry) = stream.try_next().await.unwrap() {
println!("entry {}", fmt_entry(&entry));
let content = entry.content_bytes(&document).await;
println!(
" content {:?}",
std::str::from_utf8(
&content
.map(|x| x.to_vec())
.unwrap_or_else(|e| format!("[Error! {e}]").into_bytes())
)
.unwrap()
)
}
}
}
Ok(())
}
async fn get_document(client: &Iroh) -> anyhow::Result<Doc> {
let document = client
.docs()
.import_namespace(iroh::docs::Capability::Write(
NamespaceSecret::from_str(GLOBAL_NAMESPACE).unwrap(),
))
.await?;
println!("::: Using namespace '{GLOBAL_NAMESPACE}'");
Ok(document)
}
fn fmt_entry(entry: &Entry) -> String {
let id = entry.id();
let key = std::str::from_utf8(id.key()).unwrap_or("<bad key>");
let author = id.author().fmt_short();
let hash = entry.content_hash();
let hash = base32::fmt_short(hash.as_bytes());
let len = HumanBytes(entry.content_len());
format!("@{author}: {key} = {hash} ({len})",)
}
fn gen_id() -> i32 {
let mut rng = rand::thread_rng();
rng.gen_range(1..=90)
}
#[derive(Parser)]
#[command(name = "todos", version)]
pub struct Opts {
#[command(subcommand)]
pub cmd: Option<Command>,
}
#[derive(Subcommand)]
pub enum Command {
/// Add a new todo task
Add {
#[arg(short, long)]
description: String,
},
}
[dependencies]
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.15"
iroh = "0.19.0"
anyhow = "1.0.86"
thiserror = "1.0"
clap = { version = "4.5.8", features = ["derive"] }
rand = "0.8.5"
indicatif = "0.17.8"
iroh-blobs = "0.19.0"
How to reproduce
🐡 ❯ cargo run -- add --description "Clean my room"
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
- [ ] 55: Clean my room
🐡 ❯ cargo run -- add --description "Clean my room"
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
- [ ] 43: Clean my room
🐡 ❯ cargo run --
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
::: Listing entry for document_id: wqgmhfsmbxwhjmouwnu5vndvt6mp6giqxngj3ox6u4cv2l4qlola
entry @sn5lby3yeaobojnr: 43 = hplv3aekrv7dmvah (13 B)
content "Clean my room"
entry @sn5lby3yeaobojnr: 55 = hplv3aekrv7dmvah (13 B)
content "Clean my room"
🐡 ❯ cargo r add --description "How does it work"
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
- [ ] 47: How does it work
document id: wqgmhfsmbxwhjmouwnu5vndvt6mp6giqxngj3ox6u4cv2l4qlola
author id: sn5lby3yeaobojnrhkzt7xlrg6oonpxeb3kfuxdsb5nbvath3m4a
🐡 ❯ cargo run --
::: Using namespace 'q4hiommvh3ttec3x2y7h4le5tkx2tee762s6miu4rer6d2asi4la'
::: Listing entry for document_id: wqgmhfsmbxwhjmouwnu5vndvt6mp6giqxngj3ox6u4cv2l4qlola
entry @sn5lby3yeaobojnr: 43 = hplv3aekrv7dmvah (13 B)
content "Clean my room"
entry @sn5lby3yeaobojnr: 47 = 4bhitlnjfq23tbld (16 B)
content "[Error! Blob not found]"
entry @sn5lby3yeaobojnr: 55 = hplv3aekrv7dmvah (13 B)
content "Clean my room"
If you see, running a same command multiple times resulting in Blob not found.
Try to run cargo r add --description "How does it work" multiple times, then list the content using cargo run --, you will find content "[Error! Blob not found]".
Credits
- @zicklag
Hi @azzamsa, thank you for the detailed report, I can replicate the issue and am now digging in.
Hi,
I found the cause of your issue. You have to make sure to always call node.shutdown().await before exiting the process, otherwise pending write transactions in the blobs or docs store are maybe not yet persisted. A while ago we changed the stores to not flush the storage after each write, but only after ~500ms or if shutdown is called. This greatly improves perfomance of write-heavy workloads.
However, we really should clarify this in the docs that shutdown must be called to make sure operations are persisted. Or we find a a way to do this reliably on drop, however this is hairy because Rust doesn't have async drop (yet).
In any case, for now the fix to your example would look like this:
async fn run() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let opts = Opts::parse();
let storage_path = std::env::current_dir().unwrap().join("data");
tokio::fs::create_dir_all(&storage_path).await?;
// Initialize node
let node = Node::persistent(&storage_path).await?.spawn().await?;
let res = run_inner(opts, &node).await;
// Shutdown the node to make sure all writes are flushed.
if let Err(err) = node.shutdown().await {
println!("Error during shutdown: {err:?}");
}
res
}
async fn run_inner<D: iroh::blobs::store::Store>(opts: Opts, node: &Node<D>) -> anyhow::Result<()> {
let client = node.client();
// your existing code from `fn run`
Ok(())
}
Awesome, thanks for finding that!
@azzamsa you really found a couple sneaky ones. :grin: That explains why I didn't run into the issue with Weird, since it's a server that doesn't exit immediately after it's done.
@Frando what do you think about printing a warning in Node, when it's dropped, if it hasn't been shutdown yet?
@Frando what do you think about printing a warning in Node, when it's dropped, if it hasn't been shutdown yet?
Yes. In the team chat we agreed the current situation is not ideal. A warning is the minimum, ideally we should either allow to manually commit transactions, and/or shutdown automatically when the last instance of a node is dropped (which will be hairy because shutdown is async).
In any case, the fix for your example would look like this:
Wow, it worked like a charm!
I found the cause of your issue. You need to always call
node.shutdown().awaitbefore exiting the process. Otherwise, pending write transactions in the blobs or docs store may not yet be persisted. We changed the stores a while ago to not flush the storage after each write but only after ~500ms or when shutdown is called. This greatly improves the performance of write-heavy workloads.
I should have reported this issue here instead of asking in the Discord servers. I've had this issue since I first started using Iroh. I attached the MWE in the Discord chat with steps to reproduce the issue, but I got the same error with 2-3 different approaches, and unfortunately, no one responded for several days. I should have come here and posted the MWE instead, as you fixed it in just 17 minutes.
you really found a couple sneaky ones. 😁 That explains why I didn't run into the issue with Weird, since it's a server that doesn't exit immediately after it's done.
Yeah. Finally, I can continue my work. This was a blocking issue for me. Also, thank you, @zicklag, for helping me through this.