Tôi có một số khung dữ liệu gấu trúc khá lớn và tôi muốn sử dụng các ánh xạ SQL hàng loạt mới để tải chúng lên máy chủ Microsoft SQL thông qua SQL Alchemy. Phương thức pandas.to_sql, trong khi đẹp, chậm.

Tôi đang gặp khó khăn khi viết mã ...

Tôi muốn có thể vượt qua chức năng này một bản dữ liệu gấu trúc mà tôi đang gọi

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://:@:/'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
4. Lý tưởng nhất, hàm sẽ 1.) Xóa bảng nếu nó đã tồn tại. 2.) Tạo một bảng mới 3.) Tạo một bản đồ và 4.) Chèn số lượng lớn bằng cách sử dụng dữ liệu Mapper và Pandas. Tôi bị mắc kẹt trong Phần 3.

Đây là mã (thừa nhận thô) của tôi. Tôi đang đấu tranh với cách làm cho chức năng Mapper hoạt động với các phím chính của mình. Tôi không thực sự cần các khóa chính nhưng chức năng Mapper yêu cầu nó.

Cảm ơn vì những hiểu biết.

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    class MyClass(Base):
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))

Tôi đã gặp phải một vấn đề tương tự với PD.TO_SQL mất hàng giờ để tải lên dữ liệu. Mã số dưới đây đã chèn cùng một dữ liệu trong vài giây.

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://:@:/'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    

Điều này có thể đã được trả lời sau đó, nhưng tôi đã tìm thấy giải pháp bằng cách đối chiếu các câu trả lời khác nhau trên trang web này và phù hợp với tài liệu của SQLalchemy.

  1. Bảng cần phải tồn tại trong DB1; với một chỉ mục được thiết lập với auto_increment trên.
  2. Hiện tại lớp cần phù hợp với DataFrame được nhập trong CSV và bảng trong DB1.

Hy vọng điều này sẽ giúp bất cứ ai đến đây và muốn trộn gấu trúc và sqlalchemy một cách nhanh chóng.

from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd

# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
    __tablename__ = 'tableName'

    id = Column(Integer, primary_key=True)
    Date = Column(String(500))
    Type = Column(String(500))
    Value = Column(Numeric())

    def __repr__(self):
        return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)

# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'

# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

# Open the session
Session = sessionmaker(bind=engine)
session = Session()

# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)

# Commit the changes

# Close the session

Dựa trên câu trả lời của @ansonw:

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = cStringIO.StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    cursor.copy_from(output, table, sep=sep, null='')

Tôi chèn 2000 dòng trong 5 giây thay vì 4 phút

Pandas 0.25.1 có thông số để thực hiện nhiều chất lọc, vì vậy không còn cần thiết phải giải quyết vấn đề này với SQLalchemy.


Trả lời có nguồn gốc từ tài liệu ở đây

Đáng chú ý rằng tôi chỉ thử nghiệm điều này với dịch chuyển đỏ. Vui lòng cho tôi biết nó diễn ra như thế nào trên cơ sở dữ liệu khác để tôi có thể cập nhật câu trả lời này.

Vì đây là khối lượng công việc nặng I/O, bạn cũng có thể sử dụng mô -đun ren python thông qua đa xử lý.dummy. Điều này tăng tốc mọi thứ đối với tôi:

import math
from multiprocessing.dummy import Pool as ThreadPool


def insert_df(df, *args, **kwargs):
    nworkers = 4

    chunksize = math.floor(df.shape[0] / nworkers)
    chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)]
    chunks.append((chunksize * nworkers, df.shape[0]))
    pool = ThreadPool(nworkers)

    def worker(chunk):
        i, j = chunk
        df.iloc[i:j, :].to_sql(*args, **kwargs)

    pool.map(worker, chunks)


insert_df(df, "foo_bar", engine, if_exists='append')

Đây là một phương pháp đơn giản

Tải xuống trình điều khiển cho kết nối cơ sở dữ liệu SQL

Đối với Linux và Mac OS:


Cho cửa sổ:


Tạo kết nối

from sqlalchemy import create_engine 
import urllib
server = '*****'
database = '********'
username = '**********'
password = '*********'

params = urllib.parse.quote_plus(
'DRIVER={ODBC Driver 17 for SQL Server};'+ 
'SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password) 

engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params) 

#Checking Connection 
connected = pd.io.sql._is_sqlalchemy_connectable(engine)

print(connected)   #Output is True if connection established successfully

Chèn dữ liệu

df.to_sql('Table_Name', con=engine, if_exists='append', index=False)

if_exists: {'fail', 'replace', 'append'}, default 'fail'
     fail: If table exists, do nothing.
     replace: If table exists, drop it, recreate it, and insert data.
     append: If table exists, insert data. Create if does not exist.

Nếu có nhiều hồ sơ

# limit based on sp_prepexec parameter count
tsql_chunksize = 2097 // len(bd_pred_score_100.columns)
# cap at 1000 (limit for number of rows inserted by table-value constructor)
tsql_chunksize = 1000 if tsql_chunksize > 1000 else tsql_chunksize

df.to_sql('table_name', con = engine, if_exists = 'append', index= False, chunksize=tsql_chunksize)

PS: Bạn có thể thay đổi các tham số theo yêu cầu của bạn.

Giải pháp cụ thể của Postgres của tôi bên dưới tự động tạo ra bảng cơ sở dữ liệu bằng cách sử dụng gấu trúc của bạn và thực hiện chèn số lượng lớn bằng cách sử dụng Postgres

import io

import pandas as pd
from sqlalchemy import create_engine

def write_to_table(df, db_engine, schema, table_name, if_exists='fail'):
    string_data_io = io.StringIO()
    df.to_csv(string_data_io, sep='|', index=False)
    pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
    table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                               index=False, if_exists=if_exists, schema=schema)
    string_data_io.readline()  # remove header
    with db_engine.connect() as connection:
        with connection.connection.cursor() as cursor:
            copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER '|' CSV" % (schema, table_name)
            cursor.copy_expert(copy_cmd, string_data_io)

Đối với những người như tôi đang cố gắng thực hiện các giải pháp nói trên:

Pandas 0.24.0 hiện có TO_SQL với tùy chọn Chunksize và Phương thức = 'Multi' chèn với số lượng lớn ...

Điều này làm việc cho tôi để kết nối với cơ sở dữ liệu Oracle bằng CX_ORACLE và SQLALCHEMY

import sqlalchemy
import cx_Oracle
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String
from sqlalchemy.orm import sessionmaker
import pandas as pd

# credentials
username = "username"
password = "password"
connectStr = "connection:/string"
tableName = "tablename"

t0 = time.time()

# connection
dsn = cx_Oracle.makedsn('host','port',service_name='servicename')

Base = declarative_base()

    __tablename__ = 'tablename'

    DOCUMENTNUM = Column(String(500), primary_key=True)
    DOCUMENTTYPE = Column(String(500))
    FILENUM = Column(String(500))
    LEASEPAYOR = Column(String(500))
    LEASESTATUS = Column(String(500))
    PROSPECT = Column(String(500))
    SPLIT = Column(String(500))
    SPLITSTATUS = Column(String(500))

engine = create_engine('oracle+cx_oracle://%s:%s@%s' % (username, password, dsn))
conn = engine.connect()  

Base.metadata.bind = engine

# Creating the session

DBSession = sessionmaker(bind=engine)

session = DBSession()

# Bulk insertion
data = pd.read_csv('data.csv')
lists = data.to_dict(orient='records')

table = sqlalchemy.Table('landmanmineral', Base.metadata, autoreload=True)
conn.execute(table.insert(), lists)



print("time taken %8.8f seconds" % (time.time() - t0) )

Mã dưới đây có thể giúp bạn, tôi đã phải đối mặt với cùng một vấn đề trong khi tải các bản ghi 695.000k

Phương pháp cắt ngắn bảng trước khi tải Truncate the table before load

Phụ thuộc vào yêu cầu, làm cộng đồng/thay thế

