feat: Enhance session retrieval by implementing DAG structure for blob processing and improving JSON message extraction

This commit is contained in:
simos
2025-08-12 14:41:22 +03:00
parent 28e27ed2fb
commit 0f45472402

View File

@@ -590,18 +590,131 @@ router.get('/sessions/:sessionId', async (req, res) => {
mode: sqlite3.OPEN_READONLY
});
// Get all blobs (conversation data)
// Use ROW_NUMBER() for clean sequential numbering and rowid for chronological ordering
const blobs = await db.all(`
SELECT
ROW_NUMBER() OVER (ORDER BY rowid) as sequence_num,
rowid as original_rowid,
id,
data
FROM blobs
ORDER BY rowid ASC
// Get all blobs to build the DAG structure
const allBlobs = await db.all(`
SELECT rowid, id, data FROM blobs
`);
// Build the DAG structure from parent-child relationships
const blobMap = new Map(); // id -> blob data
const parentRefs = new Map(); // blob id -> [parent blob ids]
const childRefs = new Map(); // blob id -> [child blob ids]
const jsonBlobs = []; // Clean JSON messages
for (const blob of allBlobs) {
blobMap.set(blob.id, blob);
// Check if this is a JSON blob (actual message) or protobuf (DAG structure)
if (blob.data && blob.data[0] === 0x7B) { // Starts with '{' - JSON blob
try {
const parsed = JSON.parse(blob.data.toString('utf8'));
jsonBlobs.push({ ...blob, parsed });
} catch (e) {
console.log('Failed to parse JSON blob:', blob.rowid);
}
} else if (blob.data) { // Protobuf blob - extract parent references
const parents = [];
let i = 0;
// Scan for parent references (0x0A 0x20 followed by 32-byte hash)
while (i < blob.data.length - 33) {
if (blob.data[i] === 0x0A && blob.data[i+1] === 0x20) {
const parentHash = blob.data.slice(i+2, i+34).toString('hex');
if (blobMap.has(parentHash)) {
parents.push(parentHash);
}
i += 34;
} else {
i++;
}
}
if (parents.length > 0) {
parentRefs.set(blob.id, parents);
// Update child references
for (const parentId of parents) {
if (!childRefs.has(parentId)) {
childRefs.set(parentId, []);
}
childRefs.get(parentId).push(blob.id);
}
}
}
}
// Perform topological sort to get chronological order
const visited = new Set();
const sorted = [];
// DFS-based topological sort
function visit(nodeId) {
if (visited.has(nodeId)) return;
visited.add(nodeId);
// Visit all parents first (dependencies)
const parents = parentRefs.get(nodeId) || [];
for (const parentId of parents) {
visit(parentId);
}
// Add this node after all its parents
const blob = blobMap.get(nodeId);
if (blob) {
sorted.push(blob);
}
}
// Start with nodes that have no parents (roots)
for (const blob of allBlobs) {
if (!parentRefs.has(blob.id)) {
visit(blob.id);
}
}
// Visit any remaining nodes (disconnected components)
for (const blob of allBlobs) {
visit(blob.id);
}
// Now extract JSON messages in the order they appear in the sorted DAG
const messageOrder = new Map(); // JSON blob id -> order index
let orderIndex = 0;
for (const blob of sorted) {
// Check if this blob references any JSON messages
if (blob.data && blob.data[0] !== 0x7B) { // Protobuf blob
// Look for JSON blob references
for (const jsonBlob of jsonBlobs) {
try {
const jsonIdBytes = Buffer.from(jsonBlob.id, 'hex');
if (blob.data.includes(jsonIdBytes)) {
if (!messageOrder.has(jsonBlob.id)) {
messageOrder.set(jsonBlob.id, orderIndex++);
}
}
} catch (e) {
// Skip if can't convert ID
}
}
}
}
// Sort JSON blobs by their appearance order in the DAG
const sortedJsonBlobs = jsonBlobs.sort((a, b) => {
const orderA = messageOrder.get(a.id) ?? Number.MAX_SAFE_INTEGER;
const orderB = messageOrder.get(b.id) ?? Number.MAX_SAFE_INTEGER;
if (orderA !== orderB) return orderA - orderB;
// Fallback to rowid if not in order map
return a.rowid - b.rowid;
});
// Use sorted JSON blobs
const blobs = sortedJsonBlobs.map((blob, idx) => ({
...blob,
sequence_num: idx + 1,
original_rowid: blob.rowid
}));
// Get metadata from meta table
const metaRows = await db.all(`
SELECT key, value FROM meta
@@ -626,37 +739,14 @@ router.get('/sessions/:sessionId', async (req, res) => {
}
}
// Parse blob data to extract messages - only include blobs with valid JSON
// Extract messages from sorted JSON blobs
const messages = [];
for (const blob of blobs) {
try {
// Attempt direct JSON parse first
const raw = blob.data.toString('utf8');
let parsed;
let isValidJson = false;
// We already parsed JSON blobs earlier
const parsed = blob.parsed;
try {
parsed = JSON.parse(raw);
isValidJson = true;
} catch (_) {
// If not JSON, try to extract JSON from within binary-looking string
const cleaned = raw.replace(/[^\x09\x0A\x0D\x20-\x7E]/g, '');
const start = cleaned.indexOf('{');
const end = cleaned.lastIndexOf('}');
if (start !== -1 && end > start) {
const jsonStr = cleaned.slice(start, end + 1);
try {
parsed = JSON.parse(jsonStr);
isValidJson = true;
} catch (_) {
parsed = null;
isValidJson = false;
}
}
}
// Only include blobs that contain valid JSON data
if (isValidJson && parsed) {
if (parsed) {
// Filter out ONLY system messages at the server level
// Check both direct role and nested message.role
const role = parsed?.role || parsed?.message?.role;
@@ -670,7 +760,6 @@ router.get('/sessions/:sessionId', async (req, res) => {
content: parsed
});
}
// Skip non-JSON blobs (binary data) completely
} catch (e) {
// Skip blobs that cause errors
console.log(`Skipping blob ${blob.id}: ${e.message}`);