Skip to content

refactor: Improve Postgres init to reuse connection #8663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: alpha
Choose a base branch
from
77 changes: 36 additions & 41 deletions src/Adapters/Storage/Postgres/PostgresStorageAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
this._onchange = callback;
}

//Note that analyze=true will run the query, executing INSERTS, DELETES, etc.
// Note that analyze=true will run the query, executing INSERTS, DELETES, etc.
createExplainableQuery(query: string, analyze: boolean = false) {
if (analyze) {
return 'EXPLAIN (ANALYZE, FORMAT JSON) ' + query;
Expand Down Expand Up @@ -911,7 +911,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
this._stream
.none('NOTIFY $1~, $2', ['schema.change', { senderId: this._uuid }])
.catch(error => {
console.log('Failed to Notify:', error); // unlikely to ever happen
console.error('Failed to Notify: ', error); // unlikely to ever happen
});
}
}
Expand Down Expand Up @@ -1020,11 +1020,11 @@ export class PostgresStorageAdapter implements StorageAdapter {
await this.setIndexesWithSchemaFormat(className, schema.indexes, {}, schema.fields, t);
return toParseSchema(schema);
})
.catch(err => {
if (err.code === PostgresUniqueIndexViolationError && err.detail.includes(className)) {
.catch(error => {
if (error.code === PostgresUniqueIndexViolationError && error.detail.includes(className)) {
throw new Parse.Error(Parse.Error.DUPLICATE_VALUE, `Class ${className} already exists.`);
}
throw err;
throw error;
});
this._notifySchemaChange();
return parseSchema;
Expand Down Expand Up @@ -1444,18 +1444,18 @@ export class PostgresStorageAdapter implements StorageAdapter {
.then(() => ({ ops: [object] }))
.catch(error => {
if (error.code === PostgresUniqueIndexViolationError) {
const err = new Parse.Error(
const parseError = new Parse.Error(
Parse.Error.DUPLICATE_VALUE,
'A duplicate value for a field with unique values was provided'
);
err.underlyingError = error;
parseError.underlyingError = error;
if (error.constraint) {
const matches = error.constraint.match(/unique_([a-zA-Z]+)/);
if (matches && Array.isArray(matches)) {
err.userInfo = { duplicated_field: matches[1] };
parseError.userInfo = { duplicated_field: matches[1] };
}
}
error = err;
error = parseError;
}
throw error;
});
Expand Down Expand Up @@ -1627,11 +1627,6 @@ export class PostgresStorageAdapter implements StorageAdapter {
);
values.push(fieldName, JSON.stringify(fieldValue.objects));
index += 2;
} else if (fieldName === 'updatedAt') {
//TODO: stop special casing this. It should check for __type === 'Date' and use .iso
updatePatterns.push(`$${index}:name = $${index + 1}`);
values.push(fieldName, fieldValue);
index += 2;
} else if (typeof fieldValue === 'string') {
updatePatterns.push(`$${index}:name = $${index + 1}`);
values.push(fieldName, fieldValue);
Expand Down Expand Up @@ -1938,7 +1933,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
};
}
});
//TODO: remove this reliance on the mongo format. DB adapter shouldn't know there is a difference between created at and any other date field.
// TODO: remove this reliance on the mongo format. DB adapter shouldn't know there is a difference between created at and any other date field.
if (object.createdAt) {
object.createdAt = object.createdAt.toISOString();
}
Expand Down Expand Up @@ -2003,6 +1998,7 @@ export class PostgresStorageAdapter implements StorageAdapter {
return this._client.none(qs, [className, constraintName, ...fieldNames]).catch(error => {
if (error.code === PostgresDuplicateRelationError && error.message.includes(constraintName)) {
// Index already exists. Ignore error.
return;
} else if (
error.code === PostgresUniqueIndexViolationError &&
error.message.includes(constraintName)
Expand Down Expand Up @@ -2336,37 +2332,36 @@ export class PostgresStorageAdapter implements StorageAdapter {
}

async performInitialization({ VolatileClassesSchemas }: any) {
// TODO: This method needs to be rewritten to make proper use of connections (@vitaly-t)
debug('performInitialization');
await this._ensureSchemaCollectionExists();
const promises = VolatileClassesSchemas.map(schema => {
return this.createTable(schema.className, schema)
.catch(err => {
if (
err.code === PostgresDuplicateRelationError ||
err.code === Parse.Error.INVALID_CLASS_NAME
) {
return Promise.resolve();
return this._client
.tx('perform-initialization', async t => {
for (const schema of VolatileClassesSchemas) {
try {
await this.createTable(schema.className, schema, t);
} catch (error) {
if (
!(
error.code === PostgresDuplicateRelationError ||
error.code === Parse.Error.INVALID_CLASS_NAME
)
) {
throw error;
}
}
throw err;
})
.then(() => this.schemaUpgrade(schema.className, schema));
});
promises.push(this._listenToSchema());
return Promise.all(promises)
.then(() => {
return this._client.tx('perform-initialization', async t => {
await t.none(sql.misc.jsonObjectSetKeys);
await t.none(sql.array.add);
await t.none(sql.array.addUnique);
await t.none(sql.array.remove);
await t.none(sql.array.containsAll);
await t.none(sql.array.containsAllRegex);
await t.none(sql.array.contains);
return t.ctx;
});
this.schemaUpgrade(schema.className, schema, t);
}
await t.none(sql.misc.jsonObjectSetKeys);
await t.none(sql.array.add);
await t.none(sql.array.addUnique);
await t.none(sql.array.remove);
await t.none(sql.array.containsAll);
await t.none(sql.array.containsAllRegex);
await t.none(sql.array.contains);
return t.ctx;
})
.then(ctx => {
this._listenToSchema();
debug(`initializationDone in ${ctx.duration}`);
})
.catch(error => {
Expand Down