Recently I was working with Playwright testing for one of my applications. And one of the key things that would be needed for a full, end-to-end testing, is a reproducible database.
The database we used was MongoDB. Some say it's a thing of the past. I say use whatever gets the job done.
Thing is, the most compact representation of our current dump file was created with mongodump
, using the --archive
and --gzip
flag. The usual way to restore that would be to use mongorestore
, and that works fine for me; just do spawn
of some processes and move on.
But my colleague uses Windows and refuses to install the proper $PATH stuff. So that doesn't work with him.
After some researching of MongoDB's dump format, I got tired and had ChatGPT do Deep Research continuing from what I found and had it generate some preliminary code. And of course, that didn't work, but it did give me quite valuable insights to how to work with the format, and I figured just a bit more editing should do the trick.
And here I'd like to present to you, the Mongorestore implementation, all rewritten in Typescript!
import * as fs from 'fs';
import { createGunzip } from 'zlib';
import { MongoClient, MongoClientOptions, Collection } from 'mongodb';
import { Transform, TransformCallback } from 'stream';
import { deserialize, EJSON } from 'bson';
const MAGIC_NUMBER = 0x8199e26d; // Expected magic header
/**
* A Transform stream that buffers incoming data until it has at least 4 bytes,
* then verifies that the first 4 bytes match the expected magic header.
*/
class MagicHeaderChecker extends Transform {
private buffer: Buffer = Buffer.alloc(0);
private checked = false;
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
if (!this.checked) {
// Accumulate chunks until we have at least 4 bytes.
this.buffer = Buffer.concat([this.buffer, chunk]);
if (this.buffer.length < 4) {
return callback();
}
// Verify the magic header (first 4 bytes as little-endian uint32).
// Read the first 4 bytes as a little-endian unsigned integer to verify the magic header.
// This value confirms that the stream is a valid MongoDB archive.
const magic = this.buffer.readUInt32LE(0);
// console.debug(`MagicHeaderChecker: Read magic number 0x${magic.toString(16)}`);
if (magic !== MAGIC_NUMBER) {
return callback(
new Error(
`Invalid magic header: expected 0x${MAGIC_NUMBER.toString(16)}, got 0x${magic.toString(
16
)}`
)
);
}
console.debug('MagicHeaderChecker: Valid header. Passing remaining bytes downstream.');
this.checked = true;
// Push any remaining bytes after the header.
// After validating the magic header, push any remaining bytes downstream for further processing.
this.push(this.buffer.slice(4));
this.buffer = Buffer.alloc(0);
callback();
} else {
// Pass along data normally.
this.push(chunk);
callback();
}
}
}
export class ArchiveBsonParser extends Transform {
private buffer = Buffer.alloc(0);
private droppedArchiveHeader = false;
constructor() {
super({ readableObjectMode: true });
}
_transform(chunk: Buffer, _enc: string, cb: TransformCallback) {
this.buffer = Buffer.concat([this.buffer, chunk]);
while (this.buffer.length >= 4) {
// Read the BSON document length from the first 4 bytes.
// This length indicates how many bytes the complete BSON document occupies.
const docLen = this.buffer.readInt32LE(0);
// Log the document length we read.
// console.debug(`ArchiveBsonParser: Document length read as ${docLen}`);
/*
About the -1 Marker
In your archive you’re seeing a document length of –1 repeatedly.
Although not officially documented in any public spec, many implementations of
the MongoDB archive (mongodump/mongorestore) have observed that a marker
value (often –1 or 0xffffffff) is used as a delimiter or an end-of-collection
marker. In your case it appears that the parser is encountering these markers.
They’re not “BSON documents” but are instead used to separate sections of the archive.
So yes, seeing –1 isn’t entirely unheard of—but it must be handled correctly
*/
// If we see a marker of -1 (0xffffffff), skip it.
// The BSON archive format may use -1 (0xffffffff) as a section or end-of-collection marker.
// When encountered, skip these 4 bytes and continue parsing.
if (docLen === -1) {
this.buffer = this.buffer.slice(4);
continue;
}
if (docLen < 5) {
// console.warn('ArchiveBsonParser: Document length too small, waiting for more data.');
break; // too small → wait for more or drop at flush
}
if (this.buffer.length < docLen) break;
const raw = this.buffer.slice(0, docLen);
this.buffer = this.buffer.slice(docLen);
try {
// Attempt to deserialize the raw BSON document.
// If deserialization fails, log the error and drop the document to maintain stream integrity.
const doc = deserialize(raw);
// if (!this.droppedArchiveHeader) {
// this.droppedArchiveHeader = true;
// continue; // skip the very first archive‑header doc
// }
if (!this.droppedArchiveHeader) {
// If the document has properties typical of an archive header (adjust these checks as needed),
// then skip it; otherwise, log a warning and push it.
if (doc && doc.options && doc.version) {
this.droppedArchiveHeader = true;
continue; // skip the archive header doc
} else {
console.warn(
'ArchiveBsonParser: First document does not look like an archive header. Not skipping it.'
);
this.droppedArchiveHeader = true;
}
}
// At this point, the document is considered a valid data or metadata document.
this.push(doc);
} catch (e) {
// corrupted doc → drop
console.error('Failed to deserialize document:', e); // Added error logging
}
}
cb();
}
_flush(cb: TransformCallback) {
// Log if any leftover data remains (should not normally happen).
if (this.buffer.length > 0) {
console.warn('ArchiveBsonParser: Flushing leftover bytes, length:', this.buffer.length);
}
cb();
// // ignore any leftover bytes
// cb();
}
}
export interface RestoreOptions {
uri: string;
db?: string; // Optional: target DB name. If not provided, the first metadata's DB is used.
archiveFile: string; // Path to the mongodump archive file.
gzip?: boolean; // True if the archive is gzip-compressed.
drop?: boolean; // Drop existing collections before restoring.
}
export async function restoreArchive(opts: RestoreOptions): Promise<void> {
const { uri, db: targetDbName, archiveFile, gzip = false, drop = false } = opts;
const mongoOptions: MongoClientOptions = {
// useNewUrlParser: true,
// useUnifiedTopology: true,
};
const client = new MongoClient(uri, mongoOptions);
await client.connect();
// Create a readable stream from the archive file.
const fileStream = fs.createReadStream(archiveFile);
// If the file is gzipped, pipe it through createGunzip(); otherwise, use the raw stream.
const decompressedStream = gzip ? fileStream.pipe(createGunzip()) : fileStream;
// Verify the magic header using our custom Transform.
const magicCheckedStream = decompressedStream.pipe(new MagicHeaderChecker());
// Pipe through BSONStream to decode BSON documents.
const bsonStream = magicCheckedStream.pipe(new ArchiveBsonParser());
let currentDbName = targetDbName;
let currentColl: Collection | null = null; // Current target collection.
let currentNs: string | null = null; // Current namespace ("db.collection").
const metaMap: Map<string, any> = new Map(); // Map to hold collection metadata.
// Process each BSON document from the stream.
for await (const doc of bsonStream) {
// console.log('Processing document:', JSON.stringify(doc, null, 2)); // Debug log
// Archive header: identified by fields like "server_version" and "tool_version".
if (doc.server_version && doc.tool_version) {
console.log(`Archive header: format version ${doc.version}, server ${doc.server_version}`);
continue;
}
// Collection metadata document: contains "metadata" (a JSON string), "db", and "collection".
if (doc.metadata && doc.db && doc.collection) {
const ns = `${doc.db}.${doc.collection}`;
try {
// EJSON is a specific dialect of JSON where we store specific types, along with the values too
const meta = EJSON.parse(doc.metadata);
// const meta = JSON.parse(doc.metadata);
metaMap.set(ns, meta);
} catch (e) {
console.error(`Error parsing metadata for ${ns}:`, e);
}
const dbInstance = client.db(doc.db);
// Drop the collection if the drop flag is set.
if (drop) {
try {
await dbInstance.collection(doc.collection).drop();
console.log(`Dropped existing collection ${ns}`);
} catch (e) {
// Ignore error if collection does not exist.
}
}
// Create the collection using any provided options.
try {
// const options = doc.metadata ? JSON.parse(doc.metadata).options || {} : {};
const options = doc.metadata ? (EJSON.parse(doc.metadata) as any)?.options || {} : {};
await dbInstance.createCollection(doc.collection, options);
console.log(`Created collection ${ns}`);
} catch (e) {
console.error(`Error creating collection ${ns}:`, e);
}
continue;
}
// Namespace header: indicated by the presence of "db" and "collection" without "metadata".
const dbName = doc.db ?? currentDbName;
const collection = doc.collection ?? currentColl;
// Identify the namespace header by checking for expected fields ('db' and 'collection').
// 'EOF', 'CRC' seems to be present too but it's not really in the spec.
// The namespace header provides the context (database and collection) for subsequent data documents.
if (doc.db && doc.collection && !doc.metadata) {
currentNs = `${doc.db}.${doc.collection}`;
currentColl = client.db(doc.db).collection(doc.collection);
continue;
}
// console.log('encountered data document', doc);
// Data document: should be inserted into the current collection.
if (!currentColl) {
throw new Error('Data document encountered before a namespace header was set.');
}
try {
await currentColl.insertOne(doc);
} catch (e) {
console.error(`Error inserting document into ${currentNs}:`, e);
}
}
// After processing all data, create indexes for each collection based on metadata.
const entriesArray = Array.from(metaMap.entries());
for (const [ns, meta] of entriesArray) {
const [dbName, collName] = ns.split('.');
const coll = client.db(dbName).collection(collName);
const indexes: any[] = meta.indexes || [];
// Skip the default _id index.
const indexSpecs = indexes.filter((idx) => idx.name !== '_id_');
if (indexSpecs.length > 0) {
try {
await coll.createIndexes(indexSpecs);
console.log(`Created ${indexSpecs.length} indexes for ${ns}`);
} catch (e) {
console.error(`Error creating indexes for ${ns}:`, e);
}
}
}
await client.close();
console.log('Restore completed.');
}