Làm cách nào để xuất dữ liệu từ Python sang PostgreSQL?
Là thợ sửa ống nước dữ liệu được tôn vinh, chúng tôi thường được giao nhiệm vụ tải dữ liệu được tìm nạp từ một nguồn từ xa vào hệ thống của chúng tôi. Nếu chúng tôi may mắn, dữ liệu được tuần tự hóa dưới dạng JSON hoặc YAML. Khi kém may mắn hơn, chúng tôi nhận được một bảng tính Excel hoặc tệp CSV luôn bị hỏng theo một cách nào đó, không thể giải thích được Show Dữ liệu từ các công ty lớn hoặc hệ thống cũ bằng cách nào đó luôn được mã hóa theo cách kỳ lạ và Quản trị viên hệ thống luôn nghĩ rằng họ giúp chúng tôi bằng cách nén các tệp (vui lòng gzip) hoặc chia chúng thành các tệp nhỏ hơn với tên ngẫu nhiên Các dịch vụ hiện đại có thể cung cấp một API phù hợp, nhưng thường thì chúng ta không cần tìm nạp tệp từ FTP, SFTP, S3 hoặc một số kho tiền độc quyền chỉ hoạt động trên Windows Trong bài viết này, chúng ta khám phá cách tốt nhất để nhập dữ liệu lộn xộn từ nguồn từ xa vào PostgreSQL Để cung cấp một giải pháp thực tế, khả thi, chúng tôi đặt các vai trò cơ bản sau
Tôi đã tìm thấy API công khai tuyệt vời này dành cho bia, vì vậy chúng tôi sẽ nhập dữ liệu vào bảng bia trong cơ sở dữ liệu Một loại bia từ API trông như thế này $ curl https://api.punkapi.com/v2/beers/?per_page=1&page=1 [ { "id": 1, "name": "Buzz", "tagline": "A Real Bitter Experience.", "first_brewed": "09/2007", "description": "A light, crisp and bitter IPA ...", "image_url": "https://images.punkapi.com/v2/keg.png", "abv": 4.5, "ibu": 60, "target_fg": 1010, "target_og": 1044, "ebc": 20, "srm": 10, "ph": 4.4, "attenuation_level": 75, "volume": { "value": 20, "unit": "litres" }, "contributed_by": "Sam Mason Tôi đã cắt bớt đầu ra cho ngắn gọn, nhưng có rất nhiều thông tin về các loại bia ở đây. Trong bài viết này, chúng tôi muốn nhập tất cả các trường trước from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 17 vào một bảng trong cơ sở dữ liệu Trường from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 18 được lồng vào nhau. Chúng tôi chỉ muốn trích xuất from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 19 từ trường và lưu nó vào trường có tên là from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 18 trong bảng volume = beer['volume']['value'] Trường >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,1 chỉ chứa năm và tháng, và trong một số trường hợp, chỉ chứa năm. Chúng tôi muốn chuyển đổi giá trị thành một ngày hợp lệ. Ví dụ: giá trị >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,2 sẽ được chuyển thành ngày >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,3. Giá trị >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,4 sẽ được chuyển thành ngày >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,5 Hãy viết một hàm đơn giản để chuyển đổi giá trị văn bản trong trường, sang Python >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,6 import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format' Hãy nhanh chóng đảm bảo rằng nó hoạt động >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1) Trong cuộc sống thực, các phép biến đổi có thể phức tạp hơn nhiều. Nhưng với mục đích của chúng tôi, điều này là quá đủ API cung cấp kết quả được phân trang. Để đóng gói phân trang, chúng tôi tạo một trình tạo tạo ra từng loại bia một from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 1 Và để sử dụng hàm tạo, chúng ta gọi và lặp lại nó >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5, Bạn sẽ nhận thấy rằng kết quả đầu tiên của mỗi trang mất nhiều thời gian hơn một chút. Điều này là do mạng yêu cầu tìm nạp trang Bước tiếp theo là tạo một bảng trong cơ sở dữ liệu để nhập dữ liệu vào Tạo cơ sở dữ liệu $ createdb -O haki testload
Thay đổi >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,7 trong ví dụ thành người dùng cục bộ của bạn Để kết nối từ Python với cơ sở dữ liệu PostgreSQL, chúng tôi sử dụng psycopg $ python -m pip install psycopg2
Sử dụng psycopg, tạo kết nối đến cơ sở dữ liệu import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True Chúng tôi đặt để mọi lệnh chúng tôi thực hiện sẽ có hiệu lực ngay lập tức. Đối với mục đích của bài viết này, điều này là tốt Bây giờ chúng ta có một kết nối, chúng ta có thể viết một hàm để tạo một bảng def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """) Hàm nhận một con trỏ và tạo một bảng chưa đăng nhập có tên là >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,9 BẢNG CHƯA ĐĂNG NHẬP Dữ liệu được ghi vào một sẽ không được ghi vào nhật ký ghi trước (WAL), làm cho nó trở nên lý tưởng cho các bảng trung gian. Lưu ý rằng bảng $ createdb -O haki testload
0 sẽ không được khôi phục trong trường hợp xảy ra sự cố và sẽ không được sao chépSử dụng kết nối mà chúng ta đã tạo trước đó, đây là cách chức năng được sử dụng volume = beer['volume']['value']0 Bây giờ chúng ta đã sẵn sàng để chuyển sang phần tiếp theo Trong suốt bài viết này, chúng tôi quan tâm đến hai số liệu chính. thời gian và ký ức Để đo thời gian cho từng phương pháp, chúng tôi sử dụng mô-đun $ createdb -O haki testload
1 tích hợpvolume = beer['volume']['value']1 Chức năng này cung cấp cho đồng hồ độ phân giải cao nhất hiện có, khiến nó trở nên lý tưởng cho các mục đích của chúng tôi Để đo mức tiêu thụ bộ nhớ, chúng tôi sẽ sử dụng gói memory-profiler volume = beer['volume']['value']2 Gói này cung cấp mức sử dụng bộ nhớ và mức sử dụng bộ nhớ gia tăng cho mỗi dòng trong mã. Điều này rất hữu ích khi tối ưu hóa bộ nhớ. Để minh họa, đây là ví dụ được cung cấp trong PyPI volume = beer['volume']['value']3 Phần thú vị là cột $ createdb -O haki testload
3 hiển thị bộ nhớ bổ sung được phân bổ bởi mã trong mỗi dòngTrong bài viết này, chúng tôi quan tâm đến bộ nhớ cao nhất được sử dụng bởi chức năng. Bộ nhớ cao nhất là sự khác biệt giữa giá trị bắt đầu của cột "Mem usage" và giá trị cao nhất (còn được gọi là "high watermark") Để lấy danh sách "Mem sử dụng" ta dùng hàm $ createdb -O haki testload
4 từ $ createdb -O haki testload
5volume = beer['volume']['value']4 Khi được sử dụng như thế này, hàm $ createdb -O haki testload
4 sẽ thực thi hàm $ createdb -O haki testload
7 với $ createdb -O haki testload
8 và $ createdb -O haki testload
9 được cung cấp, nhưng cũng khởi chạy một quy trình khác trong nền để giám sát việc sử dụng bộ nhớ mỗi __18_______0 giâyĐối với các hoạt động rất nhanh, chức năng $ createdb -O haki testload
7 có thể được thực hiện nhiều lần. Bằng cách đặt $ python -m pip install psycopg2
0 thành một giá trị, chúng tôi buộc nó chỉ thực thi một lầnĐối số $ python -m pip install psycopg2
3 yêu cầu hàm trả về kết quả của $ createdb -O haki testload
7Để kết hợp tất cả lại với nhau, chúng tôi tạo trình trang trí sau để đo và báo cáo thời gian và bộ nhớ volume = beer['volume']['value']5 Để loại bỏ ảnh hưởng lẫn nhau của thời gian trên bộ nhớ và ngược lại, chúng tôi thực hiện chức năng hai lần. Đầu tiên là tính thời gian, thứ hai là đo mức sử dụng bộ nhớ Trình trang trí sẽ in tên hàm và bất kỳ đối số từ khóa nào, đồng thời báo cáo thời gian và bộ nhớ đã sử dụng volume = beer['volume']['value']6 Chỉ các đối số từ khóa được in. Điều này là có chủ ý, chúng tôi sẽ sử dụng nó trong các thử nghiệm tham số hóa Tại thời điểm viết bài, API bia chỉ chứa 325 loại bia. Để làm việc trên một tập dữ liệu lớn, chúng tôi sao chép nó 100 lần và lưu trữ trong bộ nhớ. Tập dữ liệu kết quả chứa 32.500 loại bia volume = beer['volume']['value']7 Để bắt chước API từ xa, các chức năng của chúng tôi sẽ chấp nhận các trình vòng lặp tương tự như giá trị trả về của $ python -m pip install psycopg2
6volume = beer['volume']['value']8 Đối với điểm chuẩn, chúng tôi sẽ nhập dữ liệu bia vào cơ sở dữ liệu. Để loại bỏ các ảnh hưởng bên ngoài chẳng hạn như mạng, chúng tôi tìm nạp dữ liệu từ API trước và phân phát cục bộ Để có được thời gian chính xác, chúng tôi "giả mạo" API từ xa volume = beer['volume']['value']9 Trong một tình huống thực tế, bạn sẽ sử dụng trực tiếp hàm $ python -m pip install psycopg2
6import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'0 Bây giờ chúng tôi đã sẵn sàng để bắt đầu Để thiết lập đường cơ sở, chúng tôi bắt đầu với cách tiếp cận đơn giản nhất, chèn từng hàng một import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'1 Lưu ý rằng khi chúng tôi lặp lại các loại bia, chúng tôi chuyển đổi >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,1 thành >>> beers = iter_beers_from_api() >>> next(beers) {'id': 1, 'name': 'Buzz', 'tagline': 'A Real Bitter Experience.', 'first_brewed': '09/2007', 'description': 'A light, crisp and bitter IPA brewed...', 'image_url': 'https://images.punkapi.com/v2/keg.png', 'abv': 4.5, 'ibu': 60, 'target_fg': 1010, ... } >>> next(beers) {'id': 2, 'name': 'Trashy Blonde', 'tagline': "You Know You Shouldn't", 'first_brewed': '04/2008', 'description': 'A titillating, ...', 'image_url': 'https://images.punkapi.com/v2/2.png', 'abv': 4.1, 'ibu': 41.5,6 và trích xuất giá trị âm lượng từ trường from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 18 lồng nhau Chạy chức năng này tạo ra đầu ra sau import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'2 Hàm mất 129 giây để nhập 32K hàng. Trình lược tả bộ nhớ cho thấy chức năng này tiêu tốn rất ít bộ nhớ Theo trực giác, việc chèn từng hàng một nghe có vẻ không hiệu quả lắm. Việc chuyển đổi bối cảnh liên tục giữa chương trình và cơ sở dữ liệu phải làm chậm nó lại Psycopg2 cung cấp một cách để chèn nhiều hàng cùng một lúc bằng cách sử dụng. Từ các tài liệu
Âm thanh đầy hứa hẹn Hãy thử nhập dữ liệu bằng cách sử dụng import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True1 import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'3 Hàm trông rất giống với hàm trước đó và các phép biến đổi cũng giống như vậy. Sự khác biệt chính ở đây là trước tiên chúng tôi chuyển đổi tất cả dữ liệu trong bộ nhớ và chỉ sau đó nhập dữ liệu đó vào cơ sở dữ liệu Chạy chức năng này tạo ra đầu ra sau import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'4 Điều này thật đáng thất vọng. Thời gian chỉ tốt hơn một chút, nhưng chức năng hiện tiêu tốn 2. 7MB bộ nhớ Để xem xét mức sử dụng bộ nhớ, một tệp JSON chỉ chứa dữ liệu chúng tôi nhập nặng 25 MB trên đĩa. Xem xét tỷ lệ, sử dụng phương pháp này để nhập tệp 1GB sẽ cần 110MB bộ nhớ Phương pháp trước đó tiêu tốn rất nhiều bộ nhớ vì dữ liệu đã chuyển đổi được lưu trữ trong bộ nhớ trước khi được psycopg xử lý Hãy xem liệu chúng ta có thể sử dụng trình vòng lặp để tránh lưu trữ dữ liệu trong bộ nhớ không import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'5 Sự khác biệt ở đây là dữ liệu được chuyển đổi được "truyền trực tuyến" vào import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True1 bằng cách sử dụng trình vòng lặp Hàm này tạo ra kết quả sau import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'6 Giải pháp "phát trực tuyến" của chúng tôi hoạt động như mong đợi và chúng tôi đã cố gắng đưa bộ nhớ về 0. Tuy nhiên, thời gian vẫn gần như giống nhau, thậm chí so với phương pháp từng người một Tài liệu về psycopg có một lưu ý rất thú vị về import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True1 trong
Vì vậy, chúng tôi đã làm điều đó sai tất cả cùng Chức năng ngay bên dưới phần này là
Hãy triển khai chức năng tải bằng cách sử dụng import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5 import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'7 Thực thi chức năng import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'8 Ồ. Đó là một bước nhảy vọt. Chức năng hoàn thành chỉ trong chưa đầy 4 giây. Nhanh hơn ~33 lần so với 129 giây mà chúng tôi đã bắt đầu Hàm import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5 sử dụng ít bộ nhớ hơn hàm import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True1 cho cùng một dữ liệu. Hãy cố gắng loại bỏ bộ nhớ bằng cách "truyền" dữ liệu vào import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5 bằng cách sử dụng trình vòng lặp import datetime def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format'9 Thực thi chức năng >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)0 Chúng tôi có khoảng thời gian gần giống nhau, nhưng với ít bộ nhớ hơn Tuy nhiên, khi đọc, lập luận def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """)1 đã thu hút sự chú ý của tôi
Tài liệu trước đây đã tuyên bố rằng chức năng này hoạt động tốt hơn vì nó thực hiện ít vòng lặp hơn đối với cơ sở dữ liệu. Nếu đó là trường hợp, kích thước trang lớn hơn sẽ giảm số vòng lặp và dẫn đến thời gian tải nhanh hơn Hãy thêm một đối số cho kích thước trang vào chức năng của chúng tôi để chúng tôi có thể thử nghiệm >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)1 Kích thước trang mặc định là 100. Hãy đánh giá các giá trị khác nhau và so sánh kết quả >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)2 Chúng tôi đã nhận được một số kết quả thú vị, hãy phá vỡ nó
Kết quả cho thấy có sự đánh đổi giữa bộ nhớ và tốc độ. Trong trường hợp này, có vẻ như điểm hấp dẫn là kích thước trang là 1000 Những viên ngọc quý trong tài liệu của psycopg không kết thúc bằng import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5. Trong khi lướt qua tài liệu, một chức năng khác có tên đã thu hút sự chú ý của tôi
Hàm def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """)4 hoạt động bằng cách tạo một danh sách GIÁ TRỊ khổng lồ cho truy vấn Hãy cho nó một spin >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)3 Nhập bia bằng chức năng >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)4 Vì vậy, ngay lập tức, chúng tôi đã tăng tốc một chút so với import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5. Tuy nhiên bộ nhớ hơi cao Giống như chúng tôi đã làm trước đây, để giảm mức tiêu thụ bộ nhớ, chúng tôi cố gắng tránh lưu trữ dữ liệu trong bộ nhớ bằng cách sử dụng trình vòng lặp thay vì danh sách >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)5 Thực hiện chức năng tạo ra các kết quả sau >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)6 Vì vậy, thời gian gần như giống nhau, nhưng bộ nhớ trở về số không Giống như import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5, hàm def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """)4 cũng chấp nhận đối số def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """)1 >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)7 Thực thi với các kích thước trang khác nhau >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)8 Cũng giống như import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5, chúng tôi nhận thấy sự đánh đổi giữa bộ nhớ và tốc độ. Ở đây cũng vậy, điểm hấp dẫn là khoảng kích thước trang 1000. Tuy nhiên, sử dụng def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """)4, chúng tôi nhận được kết quả nhanh hơn ~20% so với cùng kích thước trang khi sử dụng import psycopg2 connection = psycopg2.connect( host="localhost", database="testload", user="haki", password=None, ) connection.autocommit = True5 Tài liệu chính thức cho PostgreSQL có toàn bộ phần về. Theo tài liệu, cách tốt nhất để tải dữ liệu vào cơ sở dữ liệu là sử dụng lệnh volume = beer['volume']['value']03 Để sử dụng volume = beer['volume']['value']03 từ Python, psycopg cung cấp một chức năng đặc biệt gọi là. Lệnh volume = beer['volume']['value']03 yêu cầu tệp CSV. Hãy xem liệu chúng ta có thể chuyển đổi dữ liệu của mình thành CSV và tải dữ liệu đó vào cơ sở dữ liệu bằng cách sử dụng volume = beer['volume']['value']05 >>> parse_first_brewed('09/2007') datetime.date(2007, 9, 1) >>> parse_first_brewed('2006') datetime.date(2006, 1, 1)9 Hãy phá vỡ nó
Bây giờ hãy xem nếu tất cả công việc khó khăn này được đền đáp from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 10 Lệnh volume = beer['volume']['value']03 là lệnh nhanh nhất mà chúng tôi từng thấy cho đến nay. Sử dụng volume = beer['volume']['value']22, quy trình hoàn tất trong chưa đầy một giây. Tuy nhiên, có vẻ như phương pháp này lãng phí hơn nhiều về việc sử dụng bộ nhớ. Hàm này tiêu tốn 99 MB, lớn hơn gấp đôi kích thước tệp JSON của chúng tôi trên đĩa Một trong những nhược điểm chính của việc sử dụng bản sao với volume = beer['volume']['value']16 là toàn bộ tệp được tạo trong bộ nhớ. Điều gì sẽ xảy ra nếu thay vì tạo toàn bộ tệp trong bộ nhớ, chúng tôi tạo một đối tượng giống như tệp sẽ hoạt động như một vùng đệm giữa nguồn từ xa và lệnh volume = beer['volume']['value']22. Bộ đệm sẽ sử dụng JSON thông qua trình vòng lặp, làm sạch và chuyển đổi dữ liệu, đồng thời xuất ra CSV sạchSao chép dữ liệu từ Trình lặp chuỗi (yuml. Tôi) Lấy cảm hứng từ câu trả lời tràn ngăn xếp này, chúng tôi đã tạo một đối tượng cung cấp một trình vòng lặp và cung cấp một giao diện giống như tệp from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 11 Để minh họa cách thức hoạt động của tính năng này, đây là cách một đối tượng giống như tệp CSV có thể được tạo từ danh sách các số from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 12 Lưu ý rằng chúng tôi đã sử dụng volume = beer['volume']['value']25 như một tệp. Bên trong, nó chỉ tìm nạp các hàng từ volume = beer['volume']['value']26 khi bộ đệm dòng bên trong của nó trống Chức năng tải bằng cách sử dụng volume = beer['volume']['value']27 trông như thế này from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 13 Sự khác biệt chính là tệp CSV của bia được sử dụng theo yêu cầu và dữ liệu không được lưu trữ trong bộ nhớ sau khi được sử dụng Hãy thực hiện chức năng và xem kết quả from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 14 Tuyệt. Thời gian thấp và bộ nhớ trở về 0 Trong nỗ lực giảm hiệu suất cuối cùng, chúng tôi nhận thấy rằng giống như def create_staging_table(cursor) -> None: cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE UNLOGGED TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """)1, lệnh volume = beer['volume']['value']03 cũng chấp nhận một đối số tương tự có tên là volume = beer['volume']['value']30
Hãy thêm một đối số volume = beer['volume']['value']30 vào hàm from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 15 Giá trị mặc định cho kích thước là 8192, tức là volume = beer['volume']['value']32, vì vậy chúng tôi sẽ giữ kích thước ở lũy thừa 2 from typing import Iterator, Dict, Any from urllib.parse import urlencode import requests def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break yield from data page += 16 Không giống như các ví dụ trước, có vẻ như không có sự đánh đổi giữa tốc độ và bộ nhớ. Điều này có ý nghĩa bởi vì phương pháp này được thiết kế để không tiêu tốn bộ nhớ. Tuy nhiên, chúng tôi nhận được thời gian khác nhau khi thay đổi kích thước trang. Đối với tập dữ liệu của chúng tôi, 8192 mặc định là điểm hấp dẫn Tóm tắt kết quả Chức năngThời gian (giây)Bộ nhớ (MB)______5_______33128. 80. 08203125____5_______34124. 72. 765625____5_______35129. 30. 0____5_______363. 9172. 50390625____5_______37130. 20. 0____5_______384. 3330. 0____5_______392. 5370. 2265625____5_______402. 58525. 4453125____5_______413. 6664. 50390625____5_______42127. 40. 0____5_______433. 6770. 0____5_______441. 4680. 0____5_______451. 5032. 25____5_______460. 627499. 109375____5_______470. 45360. 0____5_______480. 45960. 0____5_______490. 46490. 0____5_______500. 61710. 0 Câu hỏi lớn bây giờ là Tôi nên sử dụng cái gì? Mỗi phương pháp đều có ưu nhược điểm riêng và phù hợp với từng trường hợp khác nhau. Mua mang về Ưu tiên các phương pháp tích hợp sẵn cho các loại dữ liệu phức tạp Thực thi nhiều, thực thi các giá trị và xử lý hàng loạt việc chuyển đổi giữa các loại dữ liệu Python sang các loại cơ sở dữ liệu. Phương pháp tiếp cận CSV yêu cầu thoát Mua mang về Thích các phương pháp tích hợp sẵn cho khối lượng dữ liệu nhỏ Các phương pháp tích hợp dễ đọc hơn và ít có khả năng bị hỏng trong tương lai. Nếu bộ nhớ và thời gian không phải là vấn đề, hãy giữ cho nó đơn giản Làm cách nào để tải dữ liệu từ Python sang PostgreSQL?Python PostgreSQL CHÈN vào Bảng cơ sở dữ liệu . Cài đặt psycopg2 bằng pip Thứ hai, Thiết lập kết nối cơ sở dữ liệu PostgreSQL trong Python Tiếp theo, Xác định truy vấn Chèn. . Thực hiện truy vấn INSERT bằng cách sử dụng con trỏ. . Sau khi thực hiện thành công truy vấn, hãy cam kết các thay đổi của bạn với cơ sở dữ liệu Làm cách nào để xuất tệp CSV vào cơ sở dữ liệu PostgreSQL bằng Python?Đầu tiên, chúng tôi nhập gói psycopg2 và thiết lập kết nối tới cơ sở dữ liệu PostgreSQL bằng pyscopg2. phương thức connect() . trước khi nhập tệp CSV, chúng tôi cần tạo bảng. Trong ví dụ bên dưới, chúng tôi đã tạo một bảng bằng cách thực hiện lệnh SQL “tạo bảng” bằng cách sử dụng con trỏ.
Làm cách nào để chèn dữ liệu từ DataFrame vào PostgreSQL?Tạo một bảng trong DB postgres của bạn có số cột bằng với Dataframe (df) . Dữ liệu trong DF sẽ được chèn vào bảng postgres của bạn. Nếu bạn muốn thay thế bảng, chúng tôi có thể thay thế nó bằng phương thức to_sql bình thường bằng cách sử dụng các tiêu đề từ df của chúng tôi và sau đó tải toàn bộ df tốn nhiều thời gian vào DB.
Làm cách nào để ghi pandas dataframe vào PostgreSQL?hàm to_sql được sử dụng để ghi khung dữ liệu đã cho vào cơ sở dữ liệu SQL. . dữ liệu. tên của bảng lừa đảo. kết nối với cơ sở dữ liệu if_exists. nếu bảng tồn tại hay không. “thay thế” hoặc “nối thêm” mục lục. Đúng hay sai |