Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 42 additions & 20 deletions src/MassiveActionHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,34 +64,48 @@ export class MassiveActionHandler extends AbstractActionHandler {
* throw an error.
*
* @param sequenceName The name of the MigrationSequence to be run.
* @param pgp pg-promise instance
* @param initial True if this is the first migration
*/
public async migrate(
sequenceName: string,
pgp: IDatabase<{}> = this.massiveInstance.instance,
initial: boolean = false,
): Promise<void> {
this.log.info(`Migrating database with Migration Sequence '${sequenceName}'...`)
const migrateStart = Date.now()
const migrationSequence = this.migrationSequenceByName[sequenceName]
if (!migrationSequence) {
throw new NonExistentMigrationError(sequenceName)
}
let ranMigrations: Migration[] = []
if (!initial) {
ranMigrations = await this.loadRanMigrations()
ranMigrations = await this.loadAppliedMigrations()
}
const extendedMigrations = ranMigrations.concat(migrationSequence.migrations)
const migrationRunner = new MigrationRunner(this.massiveInstance.instance, extendedMigrations, this.dbSchema, true)
const migrationRunner = new MigrationRunner(
this.massiveInstance.instance,
extendedMigrations,
this.dbSchema,
this.log,
true
)
await migrationRunner.migrate(
migrationSequence.sequenceName,
this.lastProcessedBlockNumber + 1,
pgp,
initial,
)
await this.massiveInstance.reload()
const migrateTime = Date.now() - migrateStart
this.log.info(`Migrated database with Migration Sequence ${sequenceName} (${migrateTime}ms)`)
}

/**
* Sets up the database by idempotently creating the schema, installing CyanAudit, creates internally used tables, and
* runs any initial migration sequences provided.
*
* @param initSequenceName The name of the MigrationSequence to be used to migrate the database initially
*/
protected async setup(initSequenceName: string = 'init'): Promise<void> {
if (this.initialized) {
Expand All @@ -100,15 +114,15 @@ export class MassiveActionHandler extends AbstractActionHandler {

if (!this.migrationSequenceByName[initSequenceName]) {
if (initSequenceName === 'init') {
this.log.warn(`No 'init' Migration sequence was provided, nor was a different initSequenceName.` +
'No initial migrations have been run.')
this.log.warn(`No 'init' Migration sequence was provided, nor was a different initSequenceName. ` +
'No initial migrations have been run.')
} else {
throw new NonExistentMigrationError(initSequenceName)
}
}

try {
const migrationRunner = new MigrationRunner(this.massiveInstance.instance, [], this.dbSchema)
const migrationRunner = new MigrationRunner(this.massiveInstance.instance, [], this.dbSchema, this.log)
await migrationRunner.setup()
await this.massiveInstance.reload()
await this.migrate(initSequenceName, this.massiveInstance.instance, true)
Expand All @@ -126,16 +140,12 @@ export class MassiveActionHandler extends AbstractActionHandler {
}

protected async handleWithState(handle: (state: any, context?: any) => void): Promise<void> {
const indexState = await this.loadIndexState()
const { lastIrreversibleBlockNumber, blockNumber } = indexState
if (blockNumber < lastIrreversibleBlockNumber) {
if (this.lastProcessedBlockNumber < this.lastIrreversibleBlockNumber) {
await this.turnOffCyanAudit()
const db = this.schemaInstance
await handle(db)
} else {
await this.turnOnCyanAudit()
await this.handleBlockWithTransactionId(handle)
}
await this.handleBlockWithTransactionId(handle)
}

protected async updateIndexState(
Expand Down Expand Up @@ -182,7 +192,9 @@ export class MassiveActionHandler extends AbstractActionHandler {
}
}

protected async loadRanMigrations(): Promise<Migration[]> {
protected async loadAppliedMigrations(): Promise<Migration[]> {
this.log.debug('Loading applied run migrations...')
const loadStart = Date.now()
const processedMigrationRows = await this.massiveInstance._migration.find()
const processedMigrations = processedMigrationRows.map((row: any) => {
return {
Expand All @@ -198,8 +210,11 @@ export class MassiveActionHandler extends AbstractActionHandler {
if (expectedName !== actualName) {
throw new MismatchedMigrationsError(expectedName, actualName, index)
}
this.log.debug(`Previously applied migration name and index matches expected: ${index} -- ${expectedName}`)
ranMigrations.push(this.allMigrations[index])
}
const loadTime = Date.now() - loadStart
this.log.debug(`Loaded ${ranMigrations.length} previously applied migrations (${loadTime}ms)`)
return ranMigrations
}

Expand All @@ -215,10 +230,9 @@ export class MassiveActionHandler extends AbstractActionHandler {
},
)
for (const { block_number: rollbackNumber, txid } of blockNumberTxIds) {
this.log.info(`ROLLING BACK BLOCK ${rollbackNumber}`)
this.log.debug(`Rolling back block ${rollbackNumber} (undoing database transaction ${txid})...`)
await this.massiveInstance.cyanaudit.fn_undo_transaction(txid)
}
this.log.info(`Rollback complete!`)
}

private warnOverwrite(db: any, toOverwrite: string): void {
Expand All @@ -231,11 +245,14 @@ export class MassiveActionHandler extends AbstractActionHandler {
private async turnOnCyanAudit(): Promise<void> {
if (!this.cyanauditEnabled) {
try {
this.log.debug('Turning on CyanAudit...')
const turnOnStart = Date.now()
await this.massiveInstance.query('SET cyanaudit.enabled = 1;')
this.cyanauditEnabled = true
this.log.info('Cyan Audit enabled!')
} catch (e) {
this.log.error('Error: ', e)
const turnOnTime = Date.now() - turnOnStart
this.log.info(`Turned on CyanAudit (${turnOnTime}ms)`)
} catch (err) {
this.log.error(err)
throw new CyanAuditError(true)
}
}
Expand All @@ -244,9 +261,13 @@ export class MassiveActionHandler extends AbstractActionHandler {
private async turnOffCyanAudit(): Promise<void> {
if (this.cyanauditEnabled) {
try {
this.log.debug('Turning off CyanAudit...')
const turnOffStart = Date.now()
await this.massiveInstance.query('SET cyanaudit.enabled = 0;')
this.cyanauditEnabled = false
this.log.info('Cyan Audit disabled!')
this.log.info('Turned off CyanAudit')
const turnOffTime = Date.now() - turnOffStart
this.log.info(`Turned off CyanAudit (${turnOffTime}ms)`)
} catch (e) {
this.log.error('Error: ', e)
throw new CyanAuditError(false)
Expand All @@ -268,8 +289,9 @@ export class MassiveActionHandler extends AbstractActionHandler {
db.txid = (await tx.instance.one('select txid_current()')).txid_current
try {
await handle(db)
} catch (e) {
throw e // Throw error to trigger ROLLBACK
} catch (err) {
this.log.debug('Error thrown in updater, triggering rollback')
throw err
}
}, {
mode: new this.massiveInstance.pgp.txMode.TransactionMode({
Expand Down
8 changes: 8 additions & 0 deletions src/MigrationRunner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Docker from 'dockerode'
import massive from 'massive'
import * as path from 'path'
import { IDatabase } from 'pg-promise'
import * as Logger from 'bunyan'
import { Migration } from './Migration'
import { MigrationRunner } from './MigrationRunner'
import * as dockerUtils from './testHelpers/docker'
Expand All @@ -14,6 +15,8 @@ const dbPass = 'docker'

const baseDir = path.join(path.resolve('./'), 'src')

const log = Logger.createLogger({ name: 'TestMigrationRunner', level: 'debug' })

class TestMigrationRunner extends MigrationRunner {
public async _checkOrCreateSchema() { await this.checkOrCreateSchema() }
public async _checkOrCreateTables() { await this.checkOrCreateTables() }
Expand Down Expand Up @@ -80,6 +83,7 @@ describe('Database setup', () => {
massiveInstance.instance,
[],
'newschema',
log,
)
await runner._checkOrCreateSchema()
await runner._checkOrCreateTables() // Schema needs something inside to be seen by Massive
Expand All @@ -91,6 +95,8 @@ describe('Database setup', () => {
const runner = new TestMigrationRunner(
massiveInstance.instance,
[],
'public',
log,
)
await runner.setup()
await massiveInstance.reload()
Expand All @@ -105,6 +111,7 @@ describe('Database setup', () => {
massiveInstance.instance,
[],
'doesntexist',
log,
)
const schemaError = Error(
`Schema 'doesntexist' does not exist. Make sure you have run \`setup()\` before migrating`,
Expand Down Expand Up @@ -160,6 +167,7 @@ describe('MigrationRunner', () => {
massiveInstance.instance,
migrations,
schemaName,
log,
)
await runner.setup()
await massiveInstance.reload()
Expand Down
20 changes: 20 additions & 0 deletions src/MigrationRunner.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as path from 'path'
import * as Logger from 'bunyan'
import { IDatabase } from 'pg-promise'
import {
ExtraMigrationHistoryError,
Expand All @@ -15,6 +16,7 @@ export class MigrationRunner {
protected pgp: IDatabase<{}>,
protected migrations: Migration[],
protected schemaName: string = 'public',
protected log: Logger,
skipSetup = false,
) {
const migrationNames = migrations.map((f) => f.name)
Expand All @@ -28,10 +30,12 @@ export class MigrationRunner {
}

public async setup() {
this.log.debug('Setting up Migration Runner...')
await this.checkOrCreateSchema()
await this.checkOrCreateTables()
await this.installCyanAudit()
this.isSetUp = true
this.log.debug('Set up Migration Runner')
}

public async migrate(
Expand All @@ -48,14 +52,20 @@ export class MigrationRunner {
}

protected async applyMigration(pgp: IDatabase<{}>, migration: Migration, sequenceName: string, blockNumber: number) {
const applyStart = Date.now()
this.log.debug(`Applying migration '${migration.name}'...`)
await migration.up(pgp)
await this.refreshCyanAudit()
await this.registerMigration(pgp, migration.name, sequenceName, blockNumber)
const applyTime = Date.now() - applyStart
this.log.info(`Applied migration '${migration.name}' (${applyTime}ms)`)
}

// public async revertTo(migrationName) {} // Down migrations

protected async checkOrCreateTables() {
this.log.debug(`Creating internally-needed tables if they don't already exist...`)
const createStart = Date.now()
await this.pgp.none(`
CREATE TABLE IF NOT EXISTS $1:raw._migration(
id serial PRIMARY KEY,
Expand All @@ -82,22 +92,32 @@ export class MigrationRunner {
txid bigint NOT NULL
);
`, [this.schemaName])
const createTime = Date.now() - createStart
this.log.debug(`Created internally-needed tables if they didn't already exist (${createTime}ms)`)
}

protected async checkOrCreateSchema() {
this.log.debug(`Creating schema '${this.schemaName}' if it doesn't already exist...`)
const createStart = Date.now()
await this.pgp.none(`
CREATE SCHEMA IF NOT EXISTS $1:raw;
`, [this.schemaName])
const createTime = Date.now() - createStart
this.log.debug(`Created schema '${this.schemaName}' if it didn't already exist (${createTime}ms)`)
}

protected async installCyanAudit() {
this.log.debug('Installing CyanAudit to database...')
const installStart = Date.now()
const cyanaudit = new Migration('', '', path.join(__dirname, 'cyanaudit/cyanaudit--2.2.0.sql'))
await cyanaudit.up(this.pgp)

const cyanauditExt = new Migration('', '', path.join(__dirname, 'cyanaudit/cyanaudit-ext.sql'))
await cyanauditExt.up(this.pgp)

await this.refreshCyanAudit()
const installTime = Date.now() - installStart
this.log.info(`Installed CyanAudit to database (${installTime}ms)`)
}

protected async refreshCyanAudit(pgp: IDatabase<{}> = this.pgp) {
Expand Down