Phát trực tiếp nodejs

Use streaming SQL with Node. js là cách để bạn có thể tra cứu từng hàng của bảng trong cơ sở dữ liệu và thao tác trên đó

Cơ sở dữ liệu SQL nói chung là rất tuyệt vời trong việc xử lý lượng dữ liệu cho nhau. Mình không nghĩ rằng đó là một sự mạnh mẽ khi nói rằng hầu hết các công ty mà mình thấy sử dụng Hadoop cho hầu hết “tất cả” bài toán về xử lý dữ liệu. Trong khi sẽ tốt hơn và tiết kiệm chi phí hơn khi chỉ cần thêm một mục hoặc bổ sung RAM vào máy chủ Postgres / MySQL của họ. Bạn có thể thực hiện bộ lọc và tổng hợp khá tốt trong SQL. Nhưng đôi khi, những nhiệm vụ mạnh mẽ này không hỗ trợ những thứ bạn cần. Trong một số trường hợp bạn sẽ muốn xử lý từng hàng trong nút. js

as Async

Hãy tưởng tượng trong bài toán này

  • Chúng ta có một cơ sở dữ liệu chứa một lượng lớn ID của các Tweet từ Twitter. Ít nhất 100 nghìn
  • Chúng ta yêu cầu số lượng lượt thích của mỗi tweet từ Twitter
  • Chúng ta muốn xử lý nhiệm vụ này dưới nền và không quá nhiều bộ nhớ

Chúng ta sẽ truyền kết quả từ truy vấn của chúng ta để lấy ra từng hàng từ cơ sở dữ liệu. Ví dụ dưới đây sẽ hoạt động khá tốt, bất kể bạn muốn sử dụng  

Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!
0 cho Postgres, 
Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!
1 cho MySQL hoặc
Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!
2 cho SQLite

// thay thế để phù hợp với CSDL của bạn
import connect, {sql} from '@databases/pg';
const db = connect();

export default async function updateTweets() {
  const tweets = db.queryStream(
    sql`SELECT id, likes FROM tweets;`
  );

  for await (const tweet of tweets) {
    const likes = await getLikes(tweet.id);
    if (likes !== tweet.likes) {
      await db.query(
        sql`
          UPDATE tweets
          SET likes = ${likes}
          WHERE id = ${tweet.id};
        `
      );
    }
  }
}

Mã này phải có phiên bản gần nhất hoặc mới nhất của Node, hoặc sử dụng bộ chuyển mã như babel để hỗ trợ. Nó sẽ truyền phát các tweet từ cơ sở dữ liệu, cho phép một bộ đệm của một số tweet trước khi bắt đầu làm chậm quá trình gửi dữ liệu. Một khi chúng tôi nhận được một tweet mới và kết thúc quá trình xử lý tweet trước đó, chúng tôi có thể xử lý nó

Nút. luồng js

Đây là một phương án khác cho việc sử dụng streaming SQL với Node. js

Tưởng tượng bạn cần xuất một bảng dữ liệu lớn ra tệp CSV để xử lý ở một hệ thống khác. Ví dụ một phần mềm phân tích hoặc nhập dữ liệu vào một gói dịch vụ kế toán không có thời hạn. Bạn có thể đợi cho đến khi bạn lấy hết tất cả các bản ghi, nhưng nó sẽ thuận lợi hơn khi chúng ta có thể bắt đầu gửi dữ liệu đi ngay khi nó sẵn sàng. Điều này thực sự hữu ích khi khách hàng tải tệp về thông tin qua một kết nối internet chậm

Ví dụ này cũng hoạt động tốt, bất kể là sử dụng 

Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!
0 cho Postgres, 
Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!
1 cho MySQL. Tuy nhiên, hiện tại nó không hỗ trợ SQLite

// thay thế để phù hợp với CSDL của bạn
import connect, {sql} from '@databases/pg';
import {Map} from 'barrage';
import stringify from 'csv-stringify';

const db = connect();

export default function getTweetsCSV() {
  const tweets = db.queryNodeStream(
    sql`SELECT id, likes FROM tweets;`
  );
  const map = new Map(tweet => [tweet.id, tweet.likes]);
  const stringifier = stringify();
  stringifier.write(['Tweet ID', 'Likes']);

  tweets.pipe(map).pipe(stringifier);
  tweets.on('error', e => stringifier.emit('error', e);
  map.on('error', e => stringifier.emit('error', e);

  return stringifier;
}

Ở đây chúng ta yêu cầu tất cả các tweet như một nút. Luồng đối tượng js. Sau đó chúng ta phân phối luồng đó vào một 

Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!
5 luồng. Cuối cùng chúng ta sẽ phân phối luồng ánh xạ này vào luồng csv-stringifier. Nó sẽ nhận một mảng cho mỗi dòng và xuất tệp CSV. Chúng ta còn có thể phân phối nó qua
Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!
6 nếu máy khách hỗ trợ gzip

Luồng là các đối tượng cho phép bạn đọc dữ liệu từ một nguồn và ghi dự kiến ​​đến một đích. Trong Node. js, có 4 loại Stream

  • Readable - Là Stream được sử dụng để kích hoạt hoạt động đọc
  • Có thể ghi - Là Stream được sử dụng cho hoạt động ghi
  • Duplex - Is Stream được sử dụng cho cả mục đích ghi và đọc
  • Transform - This is a type Duplex Stream, other at place is the first result is being tính toán dựa trên cơ sở dữ liệu bạn đã nhập vào

Mỗi loại Stream là một sự kiện có thể hiện của đối tượng EventEmitter và ném một số sự kiện vào các thời điểm khác nhau. Danh sách sau đây liệt kê một số điều kiện thường được sử dụng

  • dữ liệu - Sự kiện này được kích hoạt khi dữ liệu có sẵn cho hoạt động đọc
  • end - Sự kiện này được kích hoạt khi không còn dữ liệu nào để đọc nữa
  • error - This event is being activate khi xảy ra bất kỳ lỗi nào trong quá trình đọc và ghi dữ liệu
  • kết thúc - Sự kiện này được kích hoạt khi tất cả dữ liệu đã được chuyển hết tới vùng hệ thống cơ sở

Phần tiếp theo mình sẽ trình bày chi tiết các hoạt động thường sử dụng trên các Stream

Đọc dữ liệu từ Stream trong Node. js

Đầu tiên, bạn tạo một tệp văn bản với tên đầu vào. txt has content after

Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!

Create a js file has main name. js. Trong tệp này, đầu tiên bạn khai báo Mô-đun fs (ở đây là Mô-đun cho hoạt động của Tệp I/O) do sử dụng phương thức yêu cầu(). Sau đó sử dụng phương thức createReadStream() nhận tham số là tên của tệp văn bản bạn đã tạo trước đó để đọc dữ liệu từ đó

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");

Now now run main. js to view results

$ node main.js

Kiểm tra kết quả đầu ra

Ket thuc chuong trinh
Hoclaptrinh la trang Web huong dan cac bai lap trinh
hoan toan mien phi cho tat ca moi nguoi!!!!!

Ghi dữ liệu vào Stream trong Node. js

You also create a main. js as on. Khác ở chỗ là thay vì sử dụng createReadStream(), bạn sử dụng phương thức createWriteStream() nhận tham số là tệp để chứa kết quả bạn cần ghi

var fs = require("fs");
var data = 'VietNamVoDoi';

// Tao mot Writable Stream
var writerStream = fs.createWriteStream('output.txt');

// Ghi du lieu toi Stream theo ma hoa utf8
writerStream.write(data,'UTF8');

// Danh dau diem cuoi cua file (end of file)
writerStream.end();

// Xu ly cac su kien lien quan toi Stream --> finish, va error
writerStream.on('finish', function() {
    console.log("Ket thuc hoat dong ghi.");
});

writerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");

Now now run main. js to view results

$ node main.js

Kiểm tra kết quả

________số 8

Bây giờ, bạn mở đầu ra. txt đã được tạo trong thư mục hiện tại và kiểm tra nội dung kết quả thu được

VietNamVoDoi

Ý nghĩa Piping Stream trong Node. js

Đường ống là một kỹ thuật. Với kỹ thuật này, chúng tôi cung cấp kết quả đầu ra của một Stream để làm dữ liệu đầu vào cho một Stream khác. Không có giới hạn nào về hoạt động Piping này, tức là quá trình trên có thể vẫn tiếp tục

Để hiểu thêm về khái niệm này, các bạn theo dõi ví dụ dưới đây. Trong ví dụ này, mình đọc dữ liệu từ một tệp, sau đó ghi dữ liệu đó vào một tệp khác

Đầu tiên, bạn tạo tệp js có tên chính. js chưa hết hạn. Trong tệp này, bạn sử dụng hai phương thức đã trình bày ở trên là createReadStream() và createWriteStream() tương ứng để đọc và ghi dữ liệu. Tiếp theo đó, sử dụng phương thức pipe() để thực hiện kỹ thuật Piping Stream như sau

// thay thế để phù hợp với CSDL của bạn
import connect, {sql} from '@databases/pg';
import {Map} from 'barrage';
import stringify from 'csv-stringify';

const db = connect();

export default function getTweetsCSV() {
  const tweets = db.queryNodeStream(
    sql`SELECT id, likes FROM tweets;`
  );
  const map = new Map(tweet => [tweet.id, tweet.likes]);
  const stringifier = stringify();
  stringifier.write(['Tweet ID', 'Likes']);

  tweets.pipe(map).pipe(stringifier);
  tweets.on('error', e => stringifier.emit('error', e);
  map.on('error', e => stringifier.emit('error', e);

  return stringifier;
}
0

chạy chính. js to view results

$ node main.js

Kiểm tra kết quả

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");
0

Open output. txt đã được tạo trong thư mục hiện tại của bạn và kiểm tra nội dung

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");
1

Ý nghĩa Chaining Stream trong Node. js

Chuỗi là một kỹ thuật để kết nối kết quả đầu ra của một Luồng với một Luồng khác và tạo một chuỗi bao gồm nhiều hoạt động Luồng. Thường thì nó được sử dụng với các Piping hoạt động

Ví dụ sau minh họa cách kết hợp hai hoạt động Piping và Chaining. Đầu tiên chúng ta nén một tập tin, sau đó giải nén tập tin đó

Create main. js. Trong tệp này, mình cần khai báo Mô-đun zlip cung cấp phương thức tạoGzip() cho hoạt động nén

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");
2

chạy chính. js to view results

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");
3

Kiểm tra kết quả

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");
4

Sau khi kiểm tra, bạn sẽ thấy rằng đầu vào. txt đã được nén và nó đã tạo một đầu vào. txt. gz in the current directory. Bây giờ, bạn thử giải nén cùng một tệp trên bằng cách sử dụng phương thức tạoGunzip() của Mô-đun zlib như sau

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");
5

chạy chính. js to view results

$ node main.js

Kiểm tra kết quả

var fs = require("fs");
var data = '';

// Tao mot Readable Stream
var readerStream = fs.createReadStream('input.txt');

// Thiet lap encoding la utf8. 
readerStream.setEncoding('UTF8');

// Xu ly cac su kien lien quan toi Stream --> data, end, va error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function(){
   console.log(data);
});

readerStream.on('error', function(err){
   console.log(err.stack);
});

console.log("Ket thuc chuong trinh");
7

Một loạt bài hướng dẫn nghiên cứu NodeJS cơ bản và nâng cao của chúng tôi dựa trên các nguồn tài liệu của chúng tôi. Tutorialspoint và W3Schools