Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
package_dir={"": "src"},
packages=["cs50"],
url="https://github.com/cs50/python-cs50",
version="7.0.1"
version="7.0.2"
)
13 changes: 8 additions & 5 deletions src/cs50/_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,21 @@ def _substitute_named_or_pyformat_markers(self):
Raises a ``RuntimeError`` if any parameters are missing or unused.
"""

unused_params = set(self._kwargs.keys())
unused_params = {param_name: True for param_name in self._kwargs.keys()}
for token_index, param_name in self._placeholders.items():
if param_name not in self._kwargs:
raise RuntimeError(f"missing value for placeholder ({param_name})")

self._tokens[token_index] = self._kwargs[param_name]
unused_params.remove(param_name)
unused_params[param_name] = False

if len(unused_params) > 0:
joined_unused_params = get_human_readable_list(sorted(unused_params))
sorted_unique_unused_param_names = sorted(set(
param_name for param_name, unused in unused_params.items() if unused))
if len(sorted_unique_unused_param_names) > 0:
joined_unused_params = get_human_readable_list(sorted_unique_unused_param_names)
raise RuntimeError(
f"unused value{'' if len(unused_params) == 1 else 's'} ({joined_unused_params})")
f"unused value{'' if len(sorted_unique_unused_param_names) == 1 else 's'}"
+ " ({joined_unused_params})")

def _escape_verbatim_colons(self):
"""Escapes verbatim colons from string literal and identifier tokens so they aren't treated
Expand Down
115 changes: 60 additions & 55 deletions tests/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_delete_returns_affected_rows(self):
def test_insert_returns_last_row_id(self):
self.assertEqual(self.db.execute("INSERT INTO cs50(val) VALUES('foo')"), 1)
self.assertEqual(self.db.execute("INSERT INTO cs50(val) VALUES('bar')"), 2)
self.assertEqual(self.db.execute("INSERT INTO cs50(val) VALUES('qux')"), 3)

def test_select_all(self):
self.assertEqual(self.db.execute("SELECT * FROM cs50"), [])
Expand Down Expand Up @@ -131,55 +132,13 @@ def test_rollback(self):
def test_identifier_case(self):
self.assertIn("count", self.db.execute("SELECT 1 AS count")[0])

def tearDown(self):
self.db.execute("DROP TABLE IF EXISTS cs50")
self.db.execute("DROP TABLE IF EXISTS foo")
self.db.execute("DROP TABLE IF EXISTS bar")

class MySQLTests(SQLTests):
@classmethod
def setUpClass(self):
self.db = SQL("mysql://[email protected]/test")

def setUp(self):
self.db.execute("CREATE TABLE IF NOT EXISTS cs50 (id INTEGER NOT NULL AUTO_INCREMENT, val VARCHAR(16), bin BLOB, PRIMARY KEY (id))")
self.db.execute("DELETE FROM cs50")

class PostgresTests(SQLTests):
@classmethod
def setUpClass(self):
self.db = SQL("postgresql://postgres:[email protected]/test")

def setUp(self):
self.db.execute("CREATE TABLE IF NOT EXISTS cs50 (id SERIAL PRIMARY KEY, val VARCHAR(16), bin BYTEA)")
self.db.execute("DELETE FROM cs50")

def test_cte(self):
self.assertEqual(self.db.execute("WITH foo AS ( SELECT 1 AS bar ) SELECT bar FROM foo"), [{"bar": 1}])

def test_postgres_scheme(self):
db = SQL("postgres://postgres:[email protected]/test")
db.execute("SELECT 1")

class SQLiteTests(SQLTests):
@classmethod
def setUpClass(self):
open("test.db", "w").close()
self.db = SQL("sqlite:///test.db")

def setUp(self):
self.db.execute("CREATE TABLE IF NOT EXISTS cs50 (id INTEGER PRIMARY KEY, val TEXT, bin BLOB)")
self.db.execute("DELETE FROM cs50")

def test_lastrowid(self):
self.db.execute("CREATE TABLE foo(id INTEGER PRIMARY KEY AUTOINCREMENT, firstname TEXT, lastname TEXT)")
self.assertEqual(self.db.execute("INSERT INTO foo (firstname, lastname) VALUES('firstname', 'lastname')"), 1)
self.assertRaises(ValueError, self.db.execute, "INSERT INTO foo (id, firstname, lastname) VALUES(1, 'firstname', 'lastname')")
self.assertEqual(self.db.execute("INSERT OR IGNORE INTO foo (id, firstname, lastname) VALUES(1, 'firstname', 'lastname')"), None)
def test_none(self):
self.db.execute("CREATE TABLE foo (val INTEGER)")
self.db.execute("SELECT * FROM foo WHERE val = ?", None)

def test_integrity_constraints(self):
self.db.execute("CREATE TABLE foo(id INTEGER PRIMARY KEY)")
self.assertEqual(self.db.execute("INSERT INTO foo VALUES(1)"), 1)
self.db.execute("INSERT INTO foo VALUES(1)")
self.assertRaises(ValueError, self.db.execute, "INSERT INTO foo VALUES(1)")

def test_foreign_key_support(self):
Expand All @@ -188,7 +147,7 @@ def test_foreign_key_support(self):
self.assertRaises(ValueError, self.db.execute, "INSERT INTO bar VALUES(50)")

def test_qmark(self):
self.db.execute("CREATE TABLE foo (firstname STRING, lastname STRING)")
self.db.execute("CREATE TABLE foo (firstname VARCHAR(255), lastname VARCHAR(255))")

self.db.execute("INSERT INTO foo VALUES (?, 'bar')", "baz")
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "baz", "lastname": "bar"}])
Expand Down Expand Up @@ -218,7 +177,7 @@ def test_qmark(self):
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "bar", "lastname": "baz"}])
self.db.execute("DELETE FROM foo")

self.db.execute("CREATE TABLE bar (firstname STRING)")
self.db.execute("CREATE TABLE bar (firstname VARCHAR(255))")

self.db.execute("INSERT INTO bar VALUES (?)", "baz")
self.assertEqual(self.db.execute("SELECT * FROM bar"), [{"firstname": "baz"}])
Expand All @@ -242,7 +201,7 @@ def test_qmark(self):
self.assertRaises(RuntimeError, self.db.execute, "INSERT INTO foo VALUES (?, ?)", 'bar', baz='baz')

def test_named(self):
self.db.execute("CREATE TABLE foo (firstname STRING, lastname STRING)")
self.db.execute("CREATE TABLE foo (firstname VARCHAR(255), lastname VARCHAR(255))")

self.db.execute("INSERT INTO foo VALUES (:baz, 'bar')", baz="baz")
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "baz", "lastname": "bar"}])
Expand All @@ -264,7 +223,11 @@ def test_named(self):
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "bar", "lastname": "baz"}])
self.db.execute("DELETE FROM foo")

self.db.execute("CREATE TABLE bar (firstname STRING)")
self.db.execute("INSERT INTO foo VALUES (:baz, :baz)", baz="baz")
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "baz", "lastname": "baz"}])
self.db.execute("DELETE FROM foo")

self.db.execute("CREATE TABLE bar (firstname VARCHAR(255))")
self.db.execute("INSERT INTO bar VALUES (:baz)", baz="baz")
self.assertEqual(self.db.execute("SELECT * FROM bar"), [{"firstname": "baz"}])

Expand All @@ -274,7 +237,7 @@ def test_named(self):
self.assertRaises(RuntimeError, self.db.execute, "INSERT INTO foo VALUES (:bar, :baz)", 'baz', bar='bar')

def test_numeric(self):
self.db.execute("CREATE TABLE foo (firstname STRING, lastname STRING)")
self.db.execute("CREATE TABLE foo (firstname VARCHAR(255), lastname VARCHAR(255))")

self.db.execute("INSERT INTO foo VALUES (:1, 'bar')", "baz")
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "baz", "lastname": "bar"}])
Expand All @@ -296,7 +259,7 @@ def test_numeric(self):
self.assertEqual(self.db.execute("SELECT * FROM foo"), [{"firstname": "bar", "lastname": "baz"}])
self.db.execute("DELETE FROM foo")

self.db.execute("CREATE TABLE bar (firstname STRING)")
self.db.execute("CREATE TABLE bar (firstname VARCHAR(255))")
self.db.execute("INSERT INTO bar VALUES (:1)", "baz")
self.assertEqual(self.db.execute("SELECT * FROM bar"), [{"firstname": "baz"}])

Expand All @@ -308,9 +271,51 @@ def test_numeric(self):
def test_cte(self):
self.assertEqual(self.db.execute("WITH foo AS ( SELECT 1 AS bar ) SELECT bar FROM foo"), [{"bar": 1}])

def test_none(self):
self.db.execute("CREATE TABLE foo (val INTEGER)")
self.db.execute("SELECT * FROM foo WHERE val = ?", None)
def tearDown(self):
self.db.execute("DROP TABLE IF EXISTS cs50")
self.db.execute("DROP TABLE IF EXISTS bar")
self.db.execute("DROP TABLE IF EXISTS foo")

class MySQLTests(SQLTests):
@classmethod
def setUpClass(self):
self.db = SQL("mysql://[email protected]/test")

def setUp(self):
self.db.execute("CREATE TABLE IF NOT EXISTS cs50 (id INTEGER NOT NULL AUTO_INCREMENT, val VARCHAR(16), bin BLOB, PRIMARY KEY (id))")
self.db.execute("DELETE FROM cs50")

class PostgresTests(SQLTests):
@classmethod
def setUpClass(self):
self.db = SQL("postgresql://postgres:[email protected]/test")

def setUp(self):
self.db.execute("CREATE TABLE IF NOT EXISTS cs50 (id SERIAL PRIMARY KEY, val VARCHAR(16), bin BYTEA)")
self.db.execute("DELETE FROM cs50")

def test_cte(self):
self.assertEqual(self.db.execute("WITH foo AS ( SELECT 1 AS bar ) SELECT bar FROM foo"), [{"bar": 1}])

def test_postgres_scheme(self):
db = SQL("postgres://postgres:[email protected]/test")
db.execute("SELECT 1")

class SQLiteTests(SQLTests):
@classmethod
def setUpClass(self):
open("test.db", "w").close()
self.db = SQL("sqlite:///test.db")

def setUp(self):
self.db.execute("CREATE TABLE IF NOT EXISTS cs50 (id INTEGER PRIMARY KEY, val TEXT, bin BLOB)")
self.db.execute("DELETE FROM cs50")

def test_lastrowid(self):
self.db.execute("CREATE TABLE foo(id INTEGER PRIMARY KEY AUTOINCREMENT, firstname TEXT, lastname TEXT)")
self.assertEqual(self.db.execute("INSERT INTO foo (firstname, lastname) VALUES('firstname', 'lastname')"), 1)
self.assertRaises(ValueError, self.db.execute, "INSERT INTO foo (id, firstname, lastname) VALUES(1, 'firstname', 'lastname')")
self.assertEqual(self.db.execute("INSERT OR IGNORE INTO foo (id, firstname, lastname) VALUES(1, 'firstname', 'lastname')"), None)

if __name__ == "__main__":
suite = unittest.TestSuite([
Expand Down