147 votes

Importer un fichier CSV dans une table de base de données sqlite3 en utilisant Python

J'ai un fichier CSV et je veux l'importer en masse dans ma base de données sqlite3 en utilisant Python. La commande est ".import .....". Mais il semble que cela ne puisse pas fonctionner ainsi. Quelqu'un peut-il me donner un exemple de comment le faire dans sqlite3 ? J'utilise Windows, au cas où. Merci

4 votes

Veuillez fournir le réel qui n'a pas fonctionné et la commande réel message d'erreur. "import...." peut être n'importe quoi. "cannot work" est trop vague pour que nous puissions le deviner. Sans détails, nous ne pouvons pas vous aider.

3 votes

La commande réelle comme je l'ai dit est ".import" et il dit erreur de syntaxe nouveau ".import".

14 votes

Veuillez afficher la commande réelle dans la question. Veuillez afficher le message d'erreur réel dans la question. Veuillez ne pas ajouter de commentaires qui ne font que répéter les choses. Veuillez mettre à jour la question avec un copier-coller de ce que vous faites réellement.

4voto

Kathirmani Points 524

Vous pouvez le faire en utilisant blaze & odo efficacement

import blaze as bz
csv_path = 'data.csv'
bz.odo(csv_path, 'sqlite:///data.db::data')

Odo stockera le fichier csv à data.db (base de données sqlite) sous le schéma data

Ou vous utilisez odo directement, sans blaze . L'un ou l'autre, c'est bien. Lisez ceci documentation

2 votes

Bz non défini :P

0 votes

Et c'est probablement un très vieux paquet à cause de son erreur intérieure : AttributeError : L'objet 'SubDiGraph' n'a pas d'attribut 'edge'.

0 votes

J'obtiens également la même erreur d'attribut : il semble qu'il y ait des commentaires sur GitHub à ce sujet, cependant

4voto

Jace Points 61

Basé sur la solution de Guy L (je l'adore), mais peut gérer les champs échappés.

import csv, sqlite3

def _get_col_datatypes(fin):
    dr = csv.DictReader(fin) # comma is default delimiter
    fieldTypes = {}
    for entry in dr:
        feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]        
        if not feildslLeft: break # We're done
        for field in feildslLeft:
            data = entry[field]

            # Need data to decide
            if len(data) == 0:
                continue

            if data.isdigit():
                fieldTypes[field] = "INTEGER"
            else:
                fieldTypes[field] = "TEXT"
        # TODO: Currently there's no support for DATE in sqllite

    if len(feildslLeft) > 0:
        raise Exception("Failed to find all the columns data types - Maybe some are empty?")

    return fieldTypes

def escapingGenerator(f):
    for line in f:
        yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")

def csvToDb(csvFile,dbFile,tablename, outputToFile = False):

    # TODO: implement output to file

    with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
        dt = _get_col_datatypes(fin)

        fin.seek(0)

        reader = csv.DictReader(fin)

        # Keep the order of the columns name just as in the CSV
        fields = reader.fieldnames
        cols = []

        # Set field and type
        for f in fields:
            cols.append("\"%s\" %s" % (f, dt[f]))

        # Generate create table statement:
        stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
        print(stmt)
        con = sqlite3.connect(dbFile)
        cur = con.cursor()
        cur.execute(stmt)

        fin.seek(0)

        reader = csv.reader(escapingGenerator(fin))

        # Generate insert statement:
        stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))

        cur.executemany(stmt, reader)
        con.commit()
        con.close()

3voto

Ashkan Points 73

Vous pouvez également ajouter le nom des champs en fonction de l'en-tête du CSV :

import sqlite3

def csv_sql(file_dir,table_name,database_name):
    con = sqlite3.connect(database_name)
    cur = con.cursor()
    # Drop the current table by: 
    # cur.execute("DROP TABLE IF EXISTS %s;" % table_name)

    with open(file_dir, 'r') as fl:
        hd = fl.readline()[:-1].split(',')
        ro = fl.readlines()
        db = [tuple(ro[i][:-1].split(',')) for i in range(len(ro))]

    header = ','.join(hd)
    cur.execute("CREATE TABLE IF NOT EXISTS %s (%s);" % (table_name,header))
    cur.executemany("INSERT INTO %s (%s) VALUES (%s);" % (table_name,header,('?,'*len(hd))[:-1]), db)
    con.commit()
    con.close()

# Example:
csv_sql('./surveys.csv','survey','eco.db')

2voto

jcomeau_ictx Points 15736

Dans l'intérêt de la simplicité, vous pourriez utiliser l'outil de ligne de commande sqlite3 à partir du Makefile de votre projet.

%.sql3: %.csv
    rm -f $@
    sqlite3 $@ -echo -cmd ".mode csv" ".import $< $*"
%.dump: %.sql3
    sqlite3 $< "select * from $*"

make test.sql3 crée ensuite la base de données sqlite à partir d'un fichier test.csv existant, avec une seule table "test". vous pouvez alors make test.dump pour vérifier le contenu.

2voto

Michał Zawadzki Points 189

Avec cela, vous pouvez également faire des jointures sur des CSV :

import sqlite3
import os
import pandas as pd
from typing import List

class CSVDriver:
    def __init__(self, table_dir_path: str):
        self.table_dir_path = table_dir_path  # where tables (ie. csv files) are located
        self._con = None

    @property
    def con(self) -> sqlite3.Connection:
        """Make a singleton connection to an in-memory SQLite database"""
        if not self._con:
            self._con = sqlite3.connect(":memory:")
        return self._con

    def _exists(self, table: str) -> bool:
        query = """
        SELECT name
        FROM sqlite_master 
        WHERE type ='table'
        AND name NOT LIKE 'sqlite_%';
        """
        tables = self.con.execute(query).fetchall()
        return table in tables

    def _load_table_to_mem(self, table: str, sep: str = None) -> None:
        """
        Load a CSV into an in-memory SQLite database
        sep is set to None in order to force pandas to auto-detect the delimiter
        """
        if self._exists(table):
            return
        file_name = table + ".csv"
        path = os.path.join(self.table_dir_path, file_name)
        if not os.path.exists(path):
            raise ValueError(f"CSV table {table} does not exist in {self.table_dir_path}")
        df = pd.read_csv(path, sep=sep, engine="python")  # set engine to python to skip pandas' warning
        df.to_sql(table, self.con, if_exists='replace', index=False, chunksize=10000)

    def query(self, query: str) -> List[tuple]:
        """
        Run an SQL query on CSV file(s). 
        Tables are loaded from table_dir_path
        """
        tables = extract_tables(query)
        for table in tables:
            self._load_table_to_mem(table)
        cursor = self.con.cursor()
        cursor.execute(query)
        records = cursor.fetchall()
        return records

extract_tables() :

import sqlparse
from sqlparse.sql import IdentifierList, Identifier,  Function
from sqlparse.tokens import Keyword, DML
from collections import namedtuple
import itertools

class Reference(namedtuple('Reference', ['schema', 'name', 'alias', 'is_function'])):
    __slots__ = ()

    def has_alias(self):
        return self.alias is not None

    @property
    def is_query_alias(self):
        return self.name is None and self.alias is not None

    @property
    def is_table_alias(self):
        return self.name is not None and self.alias is not None and not self.is_function

    @property
    def full_name(self):
        if self.schema is None:
            return self.name
        else:
            return self.schema + '.' + self.name

def _is_subselect(parsed):
    if not parsed.is_group:
        return False
    for item in parsed.tokens:
        if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
                                                        'UPDATE', 'CREATE', 'DELETE'):
            return True
    return False

def _identifier_is_function(identifier):
    return any(isinstance(t, Function) for t in identifier.tokens)

def _extract_from_part(parsed):
    tbl_prefix_seen = False
    for item in parsed.tokens:
        if item.is_group:
            for x in _extract_from_part(item):
                yield x
        if tbl_prefix_seen:
            if _is_subselect(item):
                for x in _extract_from_part(item):
                    yield x
            # An incomplete nested select won't be recognized correctly as a
            # sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
            # the second FROM to trigger this elif condition resulting in a
            # StopIteration. So we need to ignore the keyword if the keyword
            # FROM.
            # Also 'SELECT * FROM abc JOIN def' will trigger this elif
            # condition. So we need to ignore the keyword JOIN and its variants
            # INNER JOIN, FULL OUTER JOIN, etc.
            elif item.ttype is Keyword and (
                    not item.value.upper() == 'FROM') and (
                    not item.value.upper().endswith('JOIN')):
                tbl_prefix_seen = False
            else:
                yield item
        elif item.ttype is Keyword or item.ttype is Keyword.DML:
            item_val = item.value.upper()
            if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
                    item_val.endswith('JOIN')):
                tbl_prefix_seen = True
        # 'SELECT a, FROM abc' will detect FROM as part of the column list.
        # So this check here is necessary.
        elif isinstance(item, IdentifierList):
            for identifier in item.get_identifiers():
                if (identifier.ttype is Keyword and
                        identifier.value.upper() == 'FROM'):
                    tbl_prefix_seen = True
                    break

def _extract_table_identifiers(token_stream):
    for item in token_stream:
        if isinstance(item, IdentifierList):
            for ident in item.get_identifiers():
                try:
                    alias = ident.get_alias()
                    schema_name = ident.get_parent_name()
                    real_name = ident.get_real_name()
                except AttributeError:
                    continue
                if real_name:
                    yield Reference(schema_name, real_name,
                                    alias, _identifier_is_function(ident))
        elif isinstance(item, Identifier):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))
        elif isinstance(item, Function):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))

def extract_tables(sql):
    # let's handle multiple statements in one sql string
    extracted_tables = []
    statements = list(sqlparse.parse(sql))
    for statement in statements:
        stream = _extract_from_part(statement)
        extracted_tables.append([ref.name for ref in _extract_table_identifiers(stream)])
    return list(itertools.chain(*extracted_tables))

Exemple (en supposant que account.csv y tojoin.csv existent dans /path/to/files ):

db_path = r"/path/to/files"
driver = CSVDriver(db_path)
query = """
SELECT tojoin.col_to_join 
FROM account
LEFT JOIN tojoin
ON account.a = tojoin.a
"""
driver.query(query)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X