.stream()
從您的資料庫串流記錄,以便一次一個或分批取用,而無需先將整個結果集緩衝到記憶體中。
await Something.stream(criteria)
.eachRecord(async (record)=>{
});
參數 | 類型 | 詳細資訊 | |
---|---|---|---|
1 | criteria | 用於在資料庫中比對記錄的 Waterline 條件。 |
使用以下其中一種
.eachRecord(async (record)=>{ ... })
.eachBatch(async (records)=>{ ... })
您為 eachRecord()
或 eachBatch()
提供的自訂函式將接收以下參數
參數 | 類型 | 詳細資訊 | |
---|---|---|---|
1 | record 或 records | 目前的記錄,或目前的記錄批次。一個批次陣列將始終包含至少一筆記錄,並且永遠不會包含超過批次大小(預設為三十)的記錄。 |
名稱 | 類型 | 何時? |
---|---|---|
UsageError(用法錯誤) | 如果傳入無效的內容時拋出。 | |
AdapterError(轉接器錯誤) | 如果在資料庫轉接器中發生錯誤時拋出。 | |
Error(錯誤) | 如果發生任何其他非預期的情況時拋出。 |
請參閱 概念 > 模型與 ORM > 錯誤 以取得在 Sails 和 Waterline 中處理錯誤的範例。
.stream()
方法幾乎與 .find()
完全相同,只是它一次提取一批記錄。每次載入一批記錄時,您提供的迭代器函式會被呼叫一次或多次。如果您使用 .eachRecord()
,則您的每個記錄函式將針對批次中的每筆記錄呼叫一次。否則,使用 .eachBatch()
,您的每個批次函式將針對整個批次呼叫一次。
這對於處理非常大的結果集非常有用,這種結果集如果嘗試同時將整個集合保存在記憶體中,可能會使伺服器的可用 RAM 溢出。您可以使用 Waterline 的 .stream()
方法來執行您可能已經熟悉的 Mongo 指標操作:準備報告、在 Shell 腳本中迴圈處理和修改資料庫記錄、將大量資料從一個地方移動到另一個地方、執行複雜的轉換,甚至協調 Map/Reduce 作業。
我們在下面探討四種範例情境
一個動作,一次迭代資料庫中名為 Finn 的使用者
await User.stream({name:'Finn'})
.eachRecord(async (user)=>{
if (Math.random() > 0.5) {
throw new Error('Oops! This is a simulated error.');
}
sails.log(`Found a user ${user.id} named Finn.`);
});
一個回應動態產生的網站地圖的動作
// e.g. in an action that handles `GET /sitemap.xml`:
var sitemapXml = '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">';
await BlogPost.stream()
.limit(50000)
.sort('title ASC')
.eachRecord((blogPost)=>{
sitemapXml += (
'<url>\n'+
' <loc>https://blog.example.com/' + _.escape(encodeURIComponent(blogPost.slug))+'</loc>\n'+
' <lastmod>'+_.escape(blogPost.updatedAt)+'</lastmod>\n'+
'<changefreq>monthly</changefreq>\n'+
'</url>'
);
});
sitemapXml += '</urlset>';
.populate()
一個命令列腳本的程式碼片段,用於搜尋來自名為 "Bailey Bitterbumps" 的人的令人毛骨悚然的評論,並向當局報告
// e.g. in a shell script
var numReported = 0;
await Comment.stream({ author: 'Bailey Bitterbumps' })
.limit(1000)
.skip(40)
.sort('title ASC')
.populate('attachedFiles', {
limit: 3,
sort: 'updatedAt'
})
.populate('fromBlogPost')
.eachRecord(async (comment)=>{
var isCreepyEnoughToWorryAbout = comment.rawMessage.match(/creepy/) && comment.attachedFiles.length > 1;
if (!isCreepyEnoughToWorryAbout) {
return;
}
await sails.helpers.sendTemplateEmail.with({
template: 'email-creepy-comment-notification',
templateData: {
url: `https://blog.example.com/${comment.fromBlogPost.slug}/comments/${comment.slug}.`
},
to: '[email protected]',
subject: 'Creepy comment alert'
});
numReported++;
});
sails.log(`Successfully reported ${numReported} creepy comments.`);
如果我們執行前一個範例中的程式碼,我們將針對每條令人毛骨悚然的評論發送一封電子郵件…這可能會很多,了解 Bailey Bitterbumps 的人就知道。這不僅會很慢,還可能意味著向我們的 事務性電子郵件供應商 發送數千個單獨的 API 請求,快速超出我們的 API 速率限制。
對於這種情況,我們可以改用 .eachBatch()
來抓取正在提取的整批記錄,而不是一次處理單個記錄,從而大幅減少必要的 API 請求數量。
預設情況下,.stream()
使用的批次大小為 30。這表示它每次批次最多載入 30 筆記錄;因此,如果您使用 .eachBatch()
,您的自訂函式每次呼叫將收到 1 到 30 筆記錄。
若要增加或減少批次大小,請將額外參數傳遞給 .eachBatch()
.eachBatch(100, async (records)=>{
console.log(`Got ${records.length} records.`);
})
在您的程式碼中使用
.eachBatch()
不一定比使用.eachRecord()
更有效率或效率更低。那是因為,無論您使用哪個迭代器,Waterline 都會一次向資料庫請求多筆記錄(預設為 30 筆)。使用.eachBatch()
,您可以使用上述的額外參數輕鬆設定此批次大小。在使用.eachRecord
時也可以自訂批次大小(例如,為了避免受到您正在使用的第三方 API 的速率限制)。只需使用.meta()
。例如,.meta({batchSize: 100})
。
- 此方法可以與
await
、Promise 鏈或 傳統 Node 回呼 一起使用。.stream()
會在從任何迭代器收到第一個錯誤時立即中止並拋出錯誤。.stream()
一次一個、依序地在每個記錄或批次上執行提供的迭代器函式。在 Sails 1.1.0 之前的版本中,建議的.stream()
用法預期迭代器會調用回呼 (next
),該回呼作為第二個參數提供。只要您實際上不在函式簽名中包含第二個參數,這就不再是必要的。- 在 Sails v1.0 / Waterline 0.13 之前的版本中,此方法具有較低階的介面,公開了 可讀取的「物件串流」。這很強大,但往往容易出錯。新的、與轉接器無關的
.stream()
不依賴事件發射器或任何特定風格的 Node 串流。(需要讓它以舊的方式運作?別擔心,只需少量程式碼,您仍然可以使用新介面輕鬆建構與 streams2/streams3 相容的可讀取「物件串流」。)- 在此處閱讀更多關於建立
.stream()
的動機,包括其他範例、背景資訊和實作細節。