Avec cela, vous pouvez également faire des jointures sur des CSV :
import sqlite3
import os
import pandas as pd
from typing import List
class CSVDriver:
def __init__(self, table_dir_path: str):
self.table_dir_path = table_dir_path # where tables (ie. csv files) are located
self._con = None
@property
def con(self) -> sqlite3.Connection:
"""Make a singleton connection to an in-memory SQLite database"""
if not self._con:
self._con = sqlite3.connect(":memory:")
return self._con
def _exists(self, table: str) -> bool:
query = """
SELECT name
FROM sqlite_master
WHERE type ='table'
AND name NOT LIKE 'sqlite_%';
"""
tables = self.con.execute(query).fetchall()
return table in tables
def _load_table_to_mem(self, table: str, sep: str = None) -> None:
"""
Load a CSV into an in-memory SQLite database
sep is set to None in order to force pandas to auto-detect the delimiter
"""
if self._exists(table):
return
file_name = table + ".csv"
path = os.path.join(self.table_dir_path, file_name)
if not os.path.exists(path):
raise ValueError(f"CSV table {table} does not exist in {self.table_dir_path}")
df = pd.read_csv(path, sep=sep, engine="python") # set engine to python to skip pandas' warning
df.to_sql(table, self.con, if_exists='replace', index=False, chunksize=10000)
def query(self, query: str) -> List[tuple]:
"""
Run an SQL query on CSV file(s).
Tables are loaded from table_dir_path
"""
tables = extract_tables(query)
for table in tables:
self._load_table_to_mem(table)
cursor = self.con.cursor()
cursor.execute(query)
records = cursor.fetchall()
return records
extract_tables() :
import sqlparse
from sqlparse.sql import IdentifierList, Identifier, Function
from sqlparse.tokens import Keyword, DML
from collections import namedtuple
import itertools
class Reference(namedtuple('Reference', ['schema', 'name', 'alias', 'is_function'])):
__slots__ = ()
def has_alias(self):
return self.alias is not None
@property
def is_query_alias(self):
return self.name is None and self.alias is not None
@property
def is_table_alias(self):
return self.name is not None and self.alias is not None and not self.is_function
@property
def full_name(self):
if self.schema is None:
return self.name
else:
return self.schema + '.' + self.name
def _is_subselect(parsed):
if not parsed.is_group:
return False
for item in parsed.tokens:
if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
'UPDATE', 'CREATE', 'DELETE'):
return True
return False
def _identifier_is_function(identifier):
return any(isinstance(t, Function) for t in identifier.tokens)
def _extract_from_part(parsed):
tbl_prefix_seen = False
for item in parsed.tokens:
if item.is_group:
for x in _extract_from_part(item):
yield x
if tbl_prefix_seen:
if _is_subselect(item):
for x in _extract_from_part(item):
yield x
# An incomplete nested select won't be recognized correctly as a
# sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
# the second FROM to trigger this elif condition resulting in a
# StopIteration. So we need to ignore the keyword if the keyword
# FROM.
# Also 'SELECT * FROM abc JOIN def' will trigger this elif
# condition. So we need to ignore the keyword JOIN and its variants
# INNER JOIN, FULL OUTER JOIN, etc.
elif item.ttype is Keyword and (
not item.value.upper() == 'FROM') and (
not item.value.upper().endswith('JOIN')):
tbl_prefix_seen = False
else:
yield item
elif item.ttype is Keyword or item.ttype is Keyword.DML:
item_val = item.value.upper()
if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
item_val.endswith('JOIN')):
tbl_prefix_seen = True
# 'SELECT a, FROM abc' will detect FROM as part of the column list.
# So this check here is necessary.
elif isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
if (identifier.ttype is Keyword and
identifier.value.upper() == 'FROM'):
tbl_prefix_seen = True
break
def _extract_table_identifiers(token_stream):
for item in token_stream:
if isinstance(item, IdentifierList):
for ident in item.get_identifiers():
try:
alias = ident.get_alias()
schema_name = ident.get_parent_name()
real_name = ident.get_real_name()
except AttributeError:
continue
if real_name:
yield Reference(schema_name, real_name,
alias, _identifier_is_function(ident))
elif isinstance(item, Identifier):
yield Reference(item.get_parent_name(), item.get_real_name(),
item.get_alias(), _identifier_is_function(item))
elif isinstance(item, Function):
yield Reference(item.get_parent_name(), item.get_real_name(),
item.get_alias(), _identifier_is_function(item))
def extract_tables(sql):
# let's handle multiple statements in one sql string
extracted_tables = []
statements = list(sqlparse.parse(sql))
for statement in statements:
stream = _extract_from_part(statement)
extracted_tables.append([ref.name for ref in _extract_table_identifiers(stream)])
return list(itertools.chain(*extracted_tables))
Exemple (en supposant que account.csv
y tojoin.csv
existent dans /path/to/files
):
db_path = r"/path/to/files"
driver = CSVDriver(db_path)
query = """
SELECT tojoin.col_to_join
FROM account
LEFT JOIN tojoin
ON account.a = tojoin.a
"""
driver.query(query)
4 votes
Veuillez fournir le réel qui n'a pas fonctionné et la commande réel message d'erreur. "import...." peut être n'importe quoi. "cannot work" est trop vague pour que nous puissions le deviner. Sans détails, nous ne pouvons pas vous aider.
3 votes
La commande réelle comme je l'ai dit est ".import" et il dit erreur de syntaxe nouveau ".import".
14 votes
Veuillez afficher la commande réelle dans la question. Veuillez afficher le message d'erreur réel dans la question. Veuillez ne pas ajouter de commentaires qui ne font que répéter les choses. Veuillez mettre à jour la question avec un copier-coller de ce que vous faites réellement.
0 votes
FWIW
.import
est la commande pour importer des fichiers dans le shell interactif SQLite.