From 9fd963c7dd56bc2c02e1697886809e5692c168c2 Mon Sep 17 00:00:00 2001 From: killagu Date: Tue, 9 Dec 2025 12:01:57 +0800 Subject: [PATCH 1/5] feat: implement mysql execute --- src/client.ts | 25 ++++++++++++ src/connection.ts | 5 +++ src/transaction.ts | 5 +++ src/types.ts | 3 +- test/client.test.ts | 92 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 129 insertions(+), 1 deletion(-) diff --git a/src/client.ts b/src/client.ts index 3ab1b1a..d5d621e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -137,6 +137,31 @@ export class RDSClient extends Operator { } } + async execute(sql: string, values?: object | any[], options?: QueryOptions): Promise { + let conn: RDSConnection | RDSTransaction; + let shouldReleaseConn = false; + if (options?.conn) { + conn = options.conn; + } else { + const ctx = this.#connectionStorage.getStore(); + const ctxConn = ctx?.[this.#connectionStorageKey]; + if (ctxConn) { + conn = ctxConn; + } else { + conn = await this.getConnection(); + shouldReleaseConn = true; + } + } + + try { + return await conn.execute(sql, values); + } finally { + if (shouldReleaseConn) { + (conn as RDSConnection).release(); + } + } + } + get pool() { return this.#pool; } diff --git a/src/connection.ts b/src/connection.ts index 09f2dcc..a5d2515 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -16,6 +16,7 @@ export class RDSConnection extends Operator { if (!this.conn[kWrapToRDS]) { [ 'query', + 'execute', 'beginTransaction', 'commit', 'rollback', @@ -36,6 +37,10 @@ export class RDSConnection extends Operator { return await this.conn.query(sql, values); } + async execute(sql: string, values?: object | any[]): Promise { + return await this.conn.execute(sql, values); + } + async beginTransaction() { return await this.conn.beginTransaction(); } diff --git a/src/transaction.ts b/src/transaction.ts index 9fa03fc..86ce67c 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -41,6 +41,11 @@ export class RDSTransaction extends Operator { return await this.conn!._query(sql, values); } + async execute(sql: string, values?: object | any[]): Promise { + this.#check(); + return await this.conn!.execute(sql, values); + } + #check() { if (!this.conn) { throw new Error('transaction was commit or rollback'); diff --git a/src/types.ts b/src/types.ts index b3f6956..165743e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -12,8 +12,9 @@ export interface RDSClientOptions extends PoolOptions { logging?: Logging; } -export interface PoolConnectionPromisify extends Omit { +export interface PoolConnectionPromisify extends Omit { query(sql: string, values?: any | any[] | { [param: string]: any }): Promise; + execute(sql: string, values?: any | any[] | { [param: string]: any }): Promise; beginTransaction(): Promise; commit(): Promise; rollback(): Promise; diff --git a/test/client.test.ts b/test/client.test.ts index d690d9d..c30e348 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -1617,4 +1617,96 @@ describe('test/client.test.ts', () => { }); }); + + describe('execute()', () => { + it('should execute sql with parameters', async () => { + const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-test', prefix + 'm@execute-test.com' ]); + assert.equal(result.affectedRows, 1); + assert(result.insertId > 0); + }); + + it('should execute select query', async () => { + await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-select-test', prefix + 'm@execute-select-test.com' ]); + + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + [ prefix + 'm@execute-select-test.com' ]); + assert(Array.isArray(rows)); + assert.equal(rows.length, 1); + assert.equal(rows[0].name, prefix + 'execute-select-test'); + }); + + it('should execute in transaction', async () => { + const tran = await db.beginTransaction(); + try { + const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-transaction-test', prefix + 'm@execute-transaction-test.com' ], + { conn: tran }); + assert.equal(result.affectedRows, 1); + await tran.commit(); + } catch (err) { + await tran.rollback(); + throw err; + } + + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + [ prefix + 'm@execute-transaction-test.com' ]); + assert.equal(rows.length, 1); + }); + + it('should execute in transaction scope', async () => { + await db.beginTransactionScope(async (tran) => { + const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-scope-test', prefix + 'm@execute-scope-test.com' ], + { conn: tran }); + assert.equal(result.affectedRows, 1); + }); + + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + [ prefix + 'm@execute-scope-test.com' ]); + assert.equal(rows.length, 1); + }); + + it('should execute with connection', async () => { + const conn = await db.getConnection(); + try { + const result = await conn.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-conn-test', prefix + 'm@execute-conn-test.com' ]); + assert.equal(result.affectedRows, 1); + } finally { + conn.release(); + } + + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + [ prefix + 'm@execute-conn-test.com' ]); + assert.equal(rows.length, 1); + }); + + it('should execute update query', async () => { + await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-update-test', prefix + 'm@execute-update-test.com' ]); + + const result = await db.execute('UPDATE `myrds-test-user` SET email = ? WHERE name = ?', + [ prefix + 'm@execute-updated.com', prefix + 'execute-update-test' ]); + assert.equal(result.affectedRows, 1); + + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + [ prefix + 'm@execute-updated.com' ]); + assert.equal(rows.length, 1); + }); + + it('should execute delete query', async () => { + await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-delete-test', prefix + 'm@execute-delete-test.com' ]); + + const result = await db.execute('DELETE FROM `myrds-test-user` WHERE name = ?', + [ prefix + 'execute-delete-test' ]); + assert.equal(result.affectedRows, 1); + + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE name = ?', + [ prefix + 'execute-delete-test' ]); + assert.equal(rows.length, 0); + }); + }); }); From 6a4127e4c62b774698c7706f2b905659644e4b0e Mon Sep 17 00:00:00 2001 From: killagu Date: Tue, 9 Dec 2025 13:26:27 +0800 Subject: [PATCH 2/5] f --- test/client.test.ts | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/test/client.test.ts b/test/client.test.ts index c30e348..489b3aa 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -1620,17 +1620,17 @@ describe('test/client.test.ts', () => { describe('execute()', () => { it('should execute sql with parameters', async () => { - const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', [ prefix + 'execute-test', prefix + 'm@execute-test.com' ]); assert.equal(result.affectedRows, 1); assert(result.insertId > 0); }); it('should execute select query', async () => { - await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', [ prefix + 'execute-select-test', prefix + 'm@execute-select-test.com' ]); - - const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', [ prefix + 'm@execute-select-test.com' ]); assert(Array.isArray(rows)); assert.equal(rows.length, 1); @@ -1640,8 +1640,8 @@ describe('test/client.test.ts', () => { it('should execute in transaction', async () => { const tran = await db.beginTransaction(); try { - const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', - [ prefix + 'execute-transaction-test', prefix + 'm@execute-transaction-test.com' ], + const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-transaction-test', prefix + 'm@execute-transaction-test.com' ], { conn: tran }); assert.equal(result.affectedRows, 1); await tran.commit(); @@ -1650,20 +1650,20 @@ describe('test/client.test.ts', () => { throw err; } - const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', [ prefix + 'm@execute-transaction-test.com' ]); assert.equal(rows.length, 1); }); it('should execute in transaction scope', async () => { - await db.beginTransactionScope(async (tran) => { - const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', - [ prefix + 'execute-scope-test', prefix + 'm@execute-scope-test.com' ], + await db.beginTransactionScope(async tran => { + const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + [ prefix + 'execute-scope-test', prefix + 'm@execute-scope-test.com' ], { conn: tran }); assert.equal(result.affectedRows, 1); }); - const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', [ prefix + 'm@execute-scope-test.com' ]); assert.equal(rows.length, 1); }); @@ -1671,40 +1671,40 @@ describe('test/client.test.ts', () => { it('should execute with connection', async () => { const conn = await db.getConnection(); try { - const result = await conn.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + const result = await conn.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', [ prefix + 'execute-conn-test', prefix + 'm@execute-conn-test.com' ]); assert.equal(result.affectedRows, 1); } finally { conn.release(); } - const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', [ prefix + 'm@execute-conn-test.com' ]); assert.equal(rows.length, 1); }); it('should execute update query', async () => { - await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', [ prefix + 'execute-update-test', prefix + 'm@execute-update-test.com' ]); - const result = await db.execute('UPDATE `myrds-test-user` SET email = ? WHERE name = ?', + const result = await db.execute('UPDATE `myrds-test-user` SET email = ? WHERE name = ?', [ prefix + 'm@execute-updated.com', prefix + 'execute-update-test' ]); assert.equal(result.affectedRows, 1); - const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?', [ prefix + 'm@execute-updated.com' ]); assert.equal(rows.length, 1); }); it('should execute delete query', async () => { - await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', + await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())', [ prefix + 'execute-delete-test', prefix + 'm@execute-delete-test.com' ]); - const result = await db.execute('DELETE FROM `myrds-test-user` WHERE name = ?', + const result = await db.execute('DELETE FROM `myrds-test-user` WHERE name = ?', [ prefix + 'execute-delete-test' ]); assert.equal(result.affectedRows, 1); - const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE name = ?', + const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE name = ?', [ prefix + 'execute-delete-test' ]); assert.equal(rows.length, 0); }); From 7abe54bc6cf3fccd371a63fc11f2d388cb6d1fd8 Mon Sep 17 00:00:00 2001 From: killagu Date: Tue, 9 Dec 2025 13:39:17 +0800 Subject: [PATCH 3/5] refactor: optimize execute method implementation and improve code reuse - Move execute method implementation from RDSConnection to Operator base class - Add _execute abstract method in Operator for consistency with _query - Extract common execution logic into private #executeWithHooks method - Reduce code duplication between query and execute methods - Maintain all existing functionality (hooks, logging, monitoring, events) - Simplify RDSConnection by removing complex execute implementation --- src/connection.ts | 2 +- src/operator.ts | 48 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/src/connection.ts b/src/connection.ts index a5d2515..2208116 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -37,7 +37,7 @@ export class RDSConnection extends Operator { return await this.conn.query(sql, values); } - async execute(sql: string, values?: object | any[]): Promise { + async _execute(sql: string, values?: object | any[]) { return await this.conn.execute(sql, values); } diff --git a/src/operator.ts b/src/operator.ts index 7b9fbe3..b3854d0 100644 --- a/src/operator.ts +++ b/src/operator.ts @@ -74,7 +74,30 @@ export abstract class Operator { } async query(sql: string, values?: object | any[]): Promise { - // query(sql, values) + return this.#executeWithHooks(sql, values, 'query', this._query.bind(this)); + } + + async queryOne(sql: string, values?: object | any[]) { + const rows = await this.query(sql, values); + return rows && rows[0] || null; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + protected async _query(_sql: string, _values?: object | any[]): Promise { + throw new Error('SubClass must impl this'); + } + + async execute(sql: string, values?: object | any[]): Promise { + return this.#executeWithHooks(sql, values, 'execute', this._execute.bind(this)); + } + + async #executeWithHooks( + sql: string, + values: object | any[] | undefined, + operation: string, + executor: (sql: string, values?: object | any[]) => Promise, + ): Promise { + // 处理前置钩子 if (this.beforeQueryHandlers.length > 0) { for (const beforeQueryHandler of this.beforeQueryHandlers) { const newSql = beforeQueryHandler(sql, values); @@ -83,30 +106,35 @@ export abstract class Operator { } } } - debug('[connection#%s] query %o', this.threadId, sql); + + debug('[connection#%s] %s %o', this.threadId, operation, sql); if (typeof this.logging === 'function') { this.logging(sql, { threadId: this.threadId }); } + const queryStart = performance.now(); let rows: any; let lastError: Error | undefined; + channels.queryStart.publish({ sql, values, connection: this.#connection, } as QueryStartMessage); + try { - rows = await this._query(sql, values); + rows = await executor(sql, values); + if (Array.isArray(rows)) { - debug('[connection#%s] query get %o rows', this.threadId, rows.length); + debug('[connection#%s] %s get %o rows', this.threadId, operation, rows.length); } else { - debug('[connection#%s] query result: %o', this.threadId, rows); + debug('[connection#%s] %s result: %o', this.threadId, operation, rows); } return rows; } catch (err) { lastError = err; err.stack = `${err.stack}\n sql: ${sql}`; - debug('[connection#%s] query error: %o', this.threadId, err); + debug('[connection#%s] %s error: %o', this.threadId, operation, err); throw err; } finally { const duration = Math.floor((performance.now() - queryStart) * 1000) / 1000; @@ -117,6 +145,7 @@ export abstract class Operator { duration, error: lastError, } as QueryEndMessage); + if (this.afterQueryHandlers.length > 0) { for (const afterQueryHandler of this.afterQueryHandlers) { afterQueryHandler(sql, rows, duration, lastError, values); @@ -125,13 +154,8 @@ export abstract class Operator { } } - async queryOne(sql: string, values?: object | any[]) { - const rows = await this.query(sql, values); - return rows && rows[0] || null; - } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - protected async _query(_sql: string, _values?: object | any[]): Promise { + protected async _execute(_sql: string, _values?: object | any[]): Promise { throw new Error('SubClass must impl this'); } From 8c072e06573280fffe32650aeb5093b491fcca65 Mon Sep 17 00:00:00 2001 From: killagu Date: Tue, 9 Dec 2025 13:41:08 +0800 Subject: [PATCH 4/5] f --- src/transaction.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index 86ce67c..02c37cd 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -41,9 +41,9 @@ export class RDSTransaction extends Operator { return await this.conn!._query(sql, values); } - async execute(sql: string, values?: object | any[]): Promise { + async _execute(sql: string, values?: object | any[]) { this.#check(); - return await this.conn!.execute(sql, values); + return await this.conn!._execute(sql, values); } #check() { From 9d89a05c2ce899858e48b012c6fcece9348693fd Mon Sep 17 00:00:00 2001 From: killagu Date: Tue, 9 Dec 2025 13:46:25 +0800 Subject: [PATCH 5/5] f --- src/client.ts | 61 ++++++++++++++++++++++----------------------------- 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/src/client.ts b/src/client.ts index d5d621e..50b85b6 100644 --- a/src/client.ts +++ b/src/client.ts @@ -79,6 +79,7 @@ export class RDSClient extends Operator { 'query', 'getConnection', 'end', + 'execute', ].forEach(method => { this.#pool[method] = promisify(this.#pool[method]); }); @@ -113,53 +114,43 @@ export class RDSClient extends Operator { } async query(sql: string, values?: object | any[], options?: QueryOptions): Promise { - let conn: RDSConnection | RDSTransaction; - let shouldReleaseConn = false; - if (options?.conn) { - conn = options.conn; - } else { - const ctx = this.#connectionStorage.getStore(); - const ctxConn = ctx?.[this.#connectionStorageKey]; - if (ctxConn) { - conn = ctxConn; - } else { - conn = await this.getConnection(); - shouldReleaseConn = true; - } - } + return this.#executeWithConnection('query', sql, values, options); + } + + async execute(sql: string, values?: object | any[], options?: QueryOptions): Promise { + return this.#executeWithConnection('execute', sql, values, options); + } + + async #executeWithConnection( + method: 'query' | 'execute', + sql: string, + values?: object | any[], + options?: QueryOptions, + ): Promise { + const { conn, shouldRelease } = await this.#getConnection(options); try { - return await conn.query(sql, values); + return await conn[method](sql, values); } finally { - if (shouldReleaseConn) { + if (shouldRelease) { (conn as RDSConnection).release(); } } } - async execute(sql: string, values?: object | any[], options?: QueryOptions): Promise { - let conn: RDSConnection | RDSTransaction; - let shouldReleaseConn = false; + async #getConnection(options?: QueryOptions): Promise<{ conn: RDSConnection | RDSTransaction; shouldRelease: boolean }> { if (options?.conn) { - conn = options.conn; - } else { - const ctx = this.#connectionStorage.getStore(); - const ctxConn = ctx?.[this.#connectionStorageKey]; - if (ctxConn) { - conn = ctxConn; - } else { - conn = await this.getConnection(); - shouldReleaseConn = true; - } + return { conn: options.conn, shouldRelease: false }; } - try { - return await conn.execute(sql, values); - } finally { - if (shouldReleaseConn) { - (conn as RDSConnection).release(); - } + const ctx = this.#connectionStorage.getStore(); + const ctxConn = ctx?.[this.#connectionStorageKey]; + if (ctxConn) { + return { conn: ctxConn, shouldRelease: false }; } + + const conn = await this.getConnection(); + return { conn, shouldRelease: true }; } get pool() {