Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 3.1.0 - Evgeniy Skomorokhov <[email protected]> Wed, 28 Jun 2017 11:12:33 +0700
- Added: support clickhouse db using clickhouse-driver

### 3.0.0 - Wandenberg Vieira Peixoto <[email protected]> Sun, 29 Jan 2017 17:00:00 -0300
- Added: support to python 3.6 (test can be executed with python 2.7 and 3.6)

Expand Down
4 changes: 4 additions & 0 deletions README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ END
/
```

h3. Support ClickHouse

Thanks Konstantin Lebedev for clickhouse-driver python library.

h2. Roadmap, bug reporting and feature requests

For detailed info about future versions, bug reporting and feature requests, go to "issues":https://github.com/guilhermechapiewski/simple-db-migrate/issues page.
Expand Down
123 changes: 123 additions & 0 deletions example/clickhouse/20170628130328_init_db.migration
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#-*- coding:utf-8 -*-
SQL_UP = u"""
CREATE TABLE IF NOT EXISTS ontime
(
Year UInt16,
Quarter UInt8,
Month UInt8,
DayofMonth UInt8,
DayOfWeek UInt8,
FlightDate Date,
UniqueCarrier FixedString(7),
AirlineID Int32,
Carrier FixedString(2),
TailNum String,
FlightNum String,
OriginAirportID Int32,
OriginAirportSeqID Int32,
OriginCityMarketID Int32,
Origin FixedString(5),
OriginCityName String,
OriginState FixedString(2),
OriginStateFips String,
OriginStateName String,
OriginWac Int32,
DestAirportID Int32,
DestAirportSeqID Int32,
DestCityMarketID Int32,
Dest FixedString(5),
DestCityName String,
DestState FixedString(2),
DestStateFips String,
DestStateName String,
DestWac Int32,
CRSDepTime Int32,
DepTime Int32,
DepDelay Int32,
DepDelayMinutes Int32,
DepDel15 Int32,
DepartureDelayGroups String,
DepTimeBlk String,
TaxiOut Int32,
WheelsOff Int32,
WheelsOn Int32,
TaxiIn Int32,
CRSArrTime Int32,
ArrTime Int32,
ArrDelay Int32,
ArrDelayMinutes Int32,
ArrDel15 Int32,
ArrivalDelayGroups Int32,
ArrTimeBlk String,
Cancelled UInt8,
CancellationCode FixedString(1),
Diverted UInt8,
CRSElapsedTime Int32,
ActualElapsedTime Int32,
AirTime Int32,
Flights Int32,
Distance Int32,
DistanceGroup UInt8,
CarrierDelay Int32,
WeatherDelay Int32,
NASDelay Int32,
SecurityDelay Int32,
LateAircraftDelay Int32,
FirstDepTime String,
TotalAddGTime String,
LongestAddGTime String,
DivAirportLandings String,
DivReachedDest String,
DivActualElapsedTime String,
DivArrDelay String,
DivDistance String,
Div1Airport String,
Div1AirportID Int32,
Div1AirportSeqID Int32,
Div1WheelsOn String,
Div1TotalGTime String,
Div1LongestGTime String,
Div1WheelsOff String,
Div1TailNum String,
Div2Airport String,
Div2AirportID Int32,
Div2AirportSeqID Int32,
Div2WheelsOn String,
Div2TotalGTime String,
Div2LongestGTime String,
Div2WheelsOff String,
Div2TailNum String,
Div3Airport String,
Div3AirportID Int32,
Div3AirportSeqID Int32,
Div3WheelsOn String,
Div3TotalGTime String,
Div3LongestGTime String,
Div3WheelsOff String,
Div3TailNum String,
Div4Airport String,
Div4AirportID Int32,
Div4AirportSeqID Int32,
Div4WheelsOn String,
Div4TotalGTime String,
Div4LongestGTime String,
Div4WheelsOff String,
Div4TailNum String,
Div5Airport String,
Div5AirportID Int32,
Div5AirportSeqID Int32,
Div5WheelsOn String,
Div5TotalGTime String,
Div5LongestGTime String,
Div5WheelsOff String,
Div5TailNum String
)
ENGINE = MergeTree(FlightDate, (Year, FlightDate), 8192);

"""

SQL_DOWN = u"""

DROP TABLE IF EXISTS ontime;

"""
11 changes: 11 additions & 0 deletions example/clickhouse/20170628162013_patch.migration
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#-*- coding:utf-8 -*-
SQL_UP = u"""

ALTER TABLE ontime ADD COLUMN custom_data String;

"""

SQL_DOWN = u"""

ALTER TABLE ontime DROP COLUMN custom_data;
"""
7 changes: 7 additions & 0 deletions example/clickhouse/simple-db-migrate.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DATABASE_HOST = "localhost"
DATABASE_USER = "default"
DATABASE_PASSWORD = ""
DATABASE_NAME = "default"
DATABASE_MIGRATIONS_DIR = "."
DATABASE_PORT=9000
DATABASE_VERSION_TABLE="db_migrations"
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,4 @@
'db-migrate = simple_db_migrate:run_from_argv',
],
},

)
2 changes: 1 addition & 1 deletion simple_db_migrate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .config import FileConfig, Config
from .main import Main

SIMPLE_DB_MIGRATE_VERSION = '3.0.0'
SIMPLE_DB_MIGRATE_VERSION = '3.1.0'

# fixing print in non-utf8 terminals
if hasattr(sys.stdout, 'encoding') and sys.stdout.encoding != 'UTF-8':
Expand Down
181 changes: 181 additions & 0 deletions simple_db_migrate/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import re
from .core import Migration
from .core.exceptions import MigrationException
from .helpers import Utils

class ClickHouse(object):
__re_objects = re.compile("(?ims)(?P<pre>.*?)(?P<main>create[ \n\t\r]*(definer[ \n\t\r]*=[ \n\t\r]*[^ \n\t\r]*[ \n\t\r]*)?(trigger|function|procedure).*?)\n[ \n\t\r]*/([ \n\t\r]+(?P<pos>.*)|$)")

def __init__(self, config=None, driver=None):
self.__clickhouse_script_encoding = config.get("database_script_encoding", "utf8")
self.__clickhouse_encoding = config.get("database_encoding", "utf8")
self.__clickhouse_host = config.get("database_host")
self.__clickhouse_port = config.get("database_port", 9000)
self.__clickhouse_user = config.get("database_user")
self.__clickhouse_passwd = config.get("database_password")
self.__clickhouse_db = config.get("database_name")
self.__version_table = config.get("database_version_table")

if config.get("drop_db_first"):
self._drop_database()
self.__driver = driver
if not driver:
import clickhouse_driver
self.__clickhouse_driver = clickhouse_driver
self.__conn = self.__clickhouse_driver.Client(
self.__clickhouse_host,
self.__clickhouse_port,
self.__clickhouse_db,
self.__clickhouse_user,
self.__clickhouse_passwd)
self._create_database_if_not_exists()
self._create_version_table_if_not_exists()

def __clickhouse_connect(self, connect_using_database_name=True):
try:
return self.__conn
except Exception as e:
raise Exception("could not connect to database: %s" % e)

def __execute(self, sql, execution_log=None):
db = self.__clickhouse_connect()
try:
statements = ClickHouse._parse_sql_statements(sql)
if len(sql.strip(' \t\n\r')) != 0 and len(statements) == 0:
raise Exception("invalid sql syntax '%s'" % Utils.encode(sql, "utf-8"))

for statement in statements:
curr_statement = statement
affected_rows = db.execute(Utils.encode(statement, self.__clickhouse_script_encoding))
if execution_log:
execution_log("%s\n-- %d row(s) affected\n" % (statement, affected_rows and len(affected_rows) or 0))
return affected_rows
except Exception as e:
raise MigrationException("error executing migration: %s" % e, curr_statement)

def __change_db_version(self, version, migration_file_name, sql_up, sql_down, up=True, execution_log=None, label_version=None):
db = self.__clickhouse_connect()
id = 1
rows = db.execute("select max(id) from %s; " % self.__version_table)
if rows and len(rows) > 0:
id = rows[0][0] + 1
if up:
if not label_version:
label_version = "NULL"
else:
label_version = "\"%s\"" % (str(label_version))
# moving up and storing history
data = [[id, str(version), label_version, migration_file_name, sql_up.replace('"', '\\"'), sql_down.replace('"', '\\"')]]
db.process_insert_query("INSERT INTO %s (id, version, label, name, sql_up, sql_down) VALUES " % self.__version_table, data)
else:
# moving down and deleting from history
rows = db.execute("select id from %s_active WHERE version = '%s' ORDER BY id DESC LIMIT 1;" % (self.__version_table, version))
if rows and len(rows) > 0:
id = rows[0][0]
#sql = "delete from %s where version = \"%s\";" % (self.__version_table, str(version))
sql = "INSERT INTO %s_deleted (id) VALUES " % (self.__version_table,)
db.process_insert_query(sql, [[id]])
else:
raise Exception("Version %s didn't found" % version)

@classmethod
def _parse_sql_statements(cls, migration_sql):
all_statements = []
last_statement = ''

match_stmt = ClickHouse.__re_objects.match(migration_sql)

if match_stmt and match_stmt.re.groups > 0:
if match_stmt.group('pre'):
all_statements = all_statements + ClickHouse._parse_sql_statements(match_stmt.group('pre'))
if match_stmt.group('main'):
all_statements.append(match_stmt.group('main'))
if match_stmt.group('pos'):
all_statements = all_statements + ClickHouse._parse_sql_statements(match_stmt.group('pos'))

else:
for statement in migration_sql.split(';'):
if len(last_statement) > 0:
curr_statement = '%s;%s' % (last_statement, statement)
else:
curr_statement = statement

count = Utils.count_occurrences(curr_statement)
single_quotes = count.get("'", 0)
double_quotes = count.get('"', 0)
left_parenthesis = count.get('(', 0)
right_parenthesis = count.get(')', 0)

if single_quotes % 2 == 0 and double_quotes % 2 == 0 and left_parenthesis == right_parenthesis:
all_statements.append(curr_statement)
last_statement = ''
else:
last_statement = curr_statement

return [s.strip() for s in all_statements if ((s.strip() != "") and (last_statement == ""))]

def _drop_database(self):
try:
self.__execute("DROP DATABASE IF EXISTS %s" % (self.__clickhouse_db,))
except Exception as e:
raise Exception("can't drop database '%s'; \n%s" % (self.__clickhouse_db, str(e)))

def _create_database_if_not_exists(self):
self.__execute("CREATE DATABASE IF NOT EXISTS %s" % (self.__clickhouse_db,))
self.__execute("USE %s" % (self.__clickhouse_db,))

def _create_version_table_if_not_exists(self):
# create version table
sql = "CREATE TABLE IF NOT EXISTS %s ( id Int32, version String, label String, name String, sql_up String, sql_down String, install_delete Int32, date_time DateTime DEFAULT now()) ENGINE = Log " % self.__version_table
self.__execute(sql)
sql = "CREATE TABLE IF NOT EXISTS %s_deleted ( id Int32, date_time DateTime DEFAULT now()) ENGINE = Log " % self.__version_table
self.__execute(sql)
sql = "CREATE VIEW IF NOT EXISTS %s_active AS SELECT * FROM db_migrations where not (id IN (select id from %s_deleted ))" % (self.__version_table, self.__version_table,)
self.__execute(sql)

def change(self, sql, new_db_version, migration_file_name, sql_up, sql_down, up=True, execution_log=None, label_version=None):
self.__execute(sql, execution_log)
self.__change_db_version(new_db_version, migration_file_name, sql_up, sql_down, up, execution_log, label_version)

def get_current_schema_version(self):
db = self.__clickhouse_connect()
rows = db.execute("SELECT version FROM %s_active order by id desc limit 0,1;" % (self.__version_table,))
version = ""
if rows and len(rows) > 0:
version = rows[0][0]
return version

def get_all_schema_versions(self):
versions = []
rows = self.__execute("SELECT version FROM %s_active order by id;" % self.__version_table)
for row in rows:
versions.append(row[0])
versions.sort()
return versions

def get_version_id_from_version_number(self, version):
rows = self.__execute("SELECT id FROM %s_active where version = '%s' order by id desc;" % (self.__version_table, version))
_id = None
if len(rows) > 0:
_id = rows[0][0]
return _id

def get_version_number_from_label(self, label):
rows = self.__execute("SELECT version FROM %s_active where label = '%s' order by id desc" % (self.__version_table, label))
version = ""
if len(rows) > 0:
version = rows[0][0]
return version

def get_all_schema_migrations(self):
migrations = []
all_migrations = self.__execute("SELECT id, version, label, name, sql_up, sql_down FROM %s_active order by id;" % self.__version_table)
for migration_db in all_migrations:
migration = Migration(id = int(migration_db[0]),
version = migration_db[1] and str(migration_db[1]) or None,
label = migration_db[2] and str(migration_db[2]) or None,
file_name = migration_db[3] and str(migration_db[3]) or None,
sql_up = Migration.ensure_sql_unicode(migration_db[4], self.__clickhouse_script_encoding),
sql_down = Migration.ensure_sql_unicode(migration_db[5], self.__clickhouse_script_encoding))
migrations.append(migration)
return migrations
3 changes: 3 additions & 0 deletions simple_db_migrate/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def __init__(self, config, sgdb=None):
elif self.config.get("database_engine") == 'mssql':
from .mssql import MSSQL
self.sgdb = MSSQL(config)
elif self.config.get("database_engine") == 'clickhouse':
from .clickhouse import ClickHouse
self.sgdb = ClickHouse(config)
else:
raise Exception("engine not supported '%s'" % self.config.get("database_engine"))

Expand Down