一、 问题背景

创建db时遇到以下报错:

postgres=# create database pgdb;
ERROR:  source database "template1" is being accessed by other users
DETAIL:  There is 1 other session using the database.

       其实解决方法是比较简单的,查询谁连到了template1,正常业务不应该会连到这个库上,与业务方确认后kill掉了该连接并排查对应功能。

postgres=# \x
Expanded display is on.

postgres=# select * from pg_stat_activity where datname = 'template1';
-[ RECORD 1 ]----+------------------------------
datid            | 1
datname          | template1
pid              | 18223
leader_pid       | 
usesysid         | 10
usename          | postgres
application_name | psql
client_addr      | 
client_hostname  | 
client_port      | -1
backend_start    | 2023-12-13 15:33:12.481628+08
xact_start       | 
query_start      | 
state_change     | 2023-12-13 15:33:12.494424+08
wait_event_type  | Client
wait_event       | ClientRead
state            | idle
backend_xid      | 
backend_xmin     | 
query_id         | 
query            | 
backend_type     | client backend

        这个测试库是我自己连的,kill掉即可

postgres=# select pg_terminate_backend(18223);
-[ RECORD 1 ]--------+--
pg_terminate_backend | t

postgres=# create database pgdb;
CREATE DATABASE

        查阅官方文档:主要限制是在复制源数据库(模板库)时,没有其他会话可以连接到模板库。如果在启动CREATE DATABASE时,模板库存在任何其他连接,则会失败;在复制操作期间,新连接禁止连到模板库。

The principal limitation is that no other sessions can be connected to the source database while it is being copied. CREATE DATABASE will fail if any other connection exists when it starts; during the copy operation, new connections to the source database are prevented.

       那么PG数据库的创建过程到底会干些什么?其对应源码中函数为createdb(dbcommands.c文件),在正式看之前,有一些预备知识。

二、 createdb 预备知识

1. 创建语法 

首先看数据库的创建语法,其实就是两项 —— DB名,创建选项

CREATE DATABASE name
    [ WITH ] [ OWNER [=] user_name ]
           [ TEMPLATE [=] template ]
           [ ENCODING [=] encoding ]
           [ LOCALE [=] locale ]
           [ LC_COLLATE [=] lc_collate ]
           [ LC_CTYPE [=] lc_ctype ]
           [ TABLESPACE [=] tablespace_name ]
           [ ALLOW_CONNECTIONS [=] allowconn ]
           [ CONNECTION LIMIT [=] connlimit ]
           [ IS_TEMPLATE [=] istemplate ]

2. 函数定义和参数

Oid createdb(ParseState *pstate, const CreatedbStmt *stmt)
  • Oid:表示创建完成后会返回新db的oid
  • ParseState:用于记录语义分析状态的结构体
  • CreatedbStmt:即执行的create database语句,主要包括DB名和选项
/* ----------------------
 *		Createdb Statement
 * ----------------------
 */
typedef struct CreatedbStmt
{
	NodeTag		type;
	char	   *dbname;			/* name of database to create */
	List	   *options;		/* List of DefElem nodes */
} CreatedbStmt;

3. createdb选项

       从上面可以看出,options其实是个链表,链表的每个节点是DefElem结构体。DefElem 本质上就是一个name = value的键值对:*defname是键,*arg是值。

/*
 * DefElem - a generic "name = value" option definition
 *
 * In some contexts the name can be qualified.  Also, certain SQL commands
 * allow a SET/ADD/DROP action to be attached to option settings, so it's
 * convenient to carry a field for that too.  (Note: currently, it is our
 * practice that the grammar allows namespace and action only in statements
 * where they are relevant; C code can just ignore those fields in other
 * statements.)
 */
typedef enum DefElemAction
{
	DEFELEM_UNSPEC,				/* no action given */
	DEFELEM_SET,
	DEFELEM_ADD,
	DEFELEM_DROP
} DefElemAction;

typedef struct DefElem
{
	NodeTag		type;
	char	   *defnamespace;	/* NULL if unqualified name */
	char	   *defname;
	Node	   *arg;			/* a (Value *) or a (TypeName *) */
	DefElemAction defaction;	/* unspecified action, or SET/ADD/DROP */
	int			location;		/* token location, or -1 if unknown */
} DefElem;

三、 数据库创建过程

下面正式来看函数的内容

1. 选项检查

strcmp是字符串比较函数,基本形式为strcmp(str1,str2):

  • 若str1=str2,则返回零
  • 若str1<str2,则返回负数
  • 若str1>str2,则返回正数

因此这部分实际就是在看创建语句是否指定了选项,选项是否有冗余或冲突。

/*
 * CREATE DATABASE
 */
Oid
createdb(ParseState *pstate, const CreatedbStmt *stmt)
{
	TableScanDesc scan;
	Relation	rel;
	Oid			src_dboid;
	Oid			src_owner;
...
	Oid			dboid;
	Oid			datdba;
	ListCell   *option;
	DefElem    *dtablespacename = NULL;
	DefElem    *downer = NULL;
	DefElem    *dtemplate = NULL;
	DefElem    *dencoding = NULL;
...
	char	   *dbname = stmt->dbname;
	char	   *dbowner = NULL;
...
	createdb_failure_params fparms;

	/* Extract options from the statement node tree */
	foreach(option, stmt->options)
	{
		DefElem    *defel = (DefElem *) lfirst(option);

		if (strcmp(defel->defname, "tablespace") == 0)
		{
			if (dtablespacename)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options"),
						 parser_errposition(pstate, defel->location)));
			dtablespacename = defel;
		}
		else if (strcmp(defel->defname, "owner") == 0)
		{
			if (downer)
				ereport(ERROR,
						(errcode(ERRCODE_SYNTAX_ERROR),
						 errmsg("conflicting or redundant options"),
						 parser_errposition(pstate, defel->location)));
			downer = defel;
		}
...
		else if (strcmp(defel->defname, "location") == 0)
		{
			ereport(WARNING,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
					 errmsg("LOCATION is not supported anymore"),
					 errhint("Consider using tablespaces instead."),
					 parser_errposition(pstate, defel->location)));
		}
		else
			ereport(ERROR,
					(errcode(ERRCODE_SYNTAX_ERROR),
					 errmsg("option \"%s\" not recognized", defel->defname),
					 parser_errposition(pstate, defel->location)));
	}

	if (dlocale && (dcollate || dctype))
		ereport(ERROR,
				(errcode(ERRCODE_SYNTAX_ERROR),
				 errmsg("conflicting or redundant options"),
				 errdetail("LOCALE cannot be specified together with LC_COLLATE or LC_CTYPE.")));
...

 检查选项值是否有效:例如指定的字符集名应该存在、连接数不能超过最大限制等

	if (downer && downer->arg)
		dbowner = defGetString(downer);
	if (dtemplate && dtemplate->arg)
		dbtemplate = defGetString(dtemplate);
	if (dencoding && dencoding->arg)
	{
		const char *encoding_name;

		if (IsA(dencoding->arg, Integer))
		{
			encoding = defGetInt32(dencoding);
			encoding_name = pg_encoding_to_char(encoding);
			if (strcmp(encoding_name, "") == 0 ||
				pg_valid_server_encoding(encoding_name) < 0)
				ereport(ERROR,
						(errcode(ERRCODE_UNDEFINED_OBJECT),
						 errmsg("%d is not a valid encoding code",
								encoding),
						 parser_errposition(pstate, dencoding->location)));
		}
		else
		{
			encoding_name = defGetString(dencoding);
			encoding = pg_valid_server_encoding(encoding_name);
			if (encoding < 0)
				ereport(ERROR,
						(errcode(ERRCODE_UNDEFINED_OBJECT),
						 errmsg("%s is not a valid encoding name",
								encoding_name),
						 parser_errposition(pstate, dencoding->location)));
		}
	}
...
	if (dconnlimit && dconnlimit->arg)
	{
		dbconnlimit = defGetInt32(dconnlimit);
		if (dbconnlimit < -1)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("invalid connection limit: %d", dbconnlimit)));
	}

最后检查是否有建db的权限

	/* obtain OID of proposed owner */
	if (dbowner)
		datdba = get_role_oid(dbowner, false);
	else
		datdba = GetUserId();

	/*
	 * To create a database, must have createdb privilege and must be able to
	 * become the target role (this does not imply that the target role itself
	 * must have createdb privilege).  The latter provision guards against
	 * "giveaway" attacks.  Note that a superuser will always have both of
	 * these privileges a fortiori.
	 */
	if (!have_createdb_privilege())
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("permission denied to create database")));

	check_is_member_of_role(GetUserId(), datdba);

2. 复制模板库

       若不指定,默认使用template1。复制时仅加共享锁,以便多个create database语句可以并发执行,同时避免有用户在删除模板库,复制出有问题的新数据库。

       首先还是一些预检查,包括模板库是否存在、是否有权限复制、参数设置检查等。

	/*
	 * Lookup database (template) to be cloned, and obtain share lock on it.
	 * ShareLock allows two CREATE DATABASEs to work from the same template
	 * concurrently, while ensuring no one is busy dropping it in parallel
	 * (which would be Very Bad since we'd likely get an incomplete copy
	 * without knowing it).  This also prevents any new connections from being
	 * made to the source until we finish copying it, so we can be sure it
	 * won't change underneath us.
	 */
	if (!dbtemplate)
		dbtemplate = "template1";	/* Default template database name */

	if (!get_db_info(dbtemplate, ShareLock,
					 &src_dboid, &src_owner, &src_encoding,
					 &src_istemplate, &src_allowconn, &src_lastsysoid,
					 &src_frozenxid, &src_minmxid, &src_deftablespace,
					 &src_collate, &src_ctype))
		ereport(ERROR,
				(errcode(ERRCODE_UNDEFINED_DATABASE),
				 errmsg("template database \"%s\" does not exist",
						dbtemplate)));

	/*
	 * Permission check: to copy a DB that's not marked datistemplate, you
	 * must be superuser or the owner thereof.
	 */
	if (!src_istemplate)
	{
		if (!pg_database_ownercheck(src_dboid, GetUserId()))
			ereport(ERROR,
					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
					 errmsg("permission denied to copy database \"%s\"",
							dbtemplate)));
	}

	/* If encoding or locales are defaulted, use source's setting */
	if (encoding < 0)
		encoding = src_encoding;
	if (dbcollate == NULL)
		dbcollate = src_collate;
	if (dbctype == NULL)
		dbctype = src_ctype;

	/* Some encodings are client only */
	if (!PG_VALID_BE_ENCODING(encoding))
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("invalid server encoding %d", encoding)));

...

	check_encoding_locale_matches(encoding, dbcollate, dbctype);

	if (strcmp(dbtemplate, "template0") != 0)
	{
		if (encoding != src_encoding)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
							pg_encoding_to_char(encoding),
							pg_encoding_to_char(src_encoding)),
					 errhint("Use the same encoding as in the template database, or use template0 as template.")));
...
	}

为新数据库设置默认表空间

/* Resolve default tablespace for new database */
if (dtablespacename && dtablespacename->arg)
{
    char       *tablespacename;
    AclResult   aclresult;

    // 获取表空间名称
    tablespacename = defGetString(dtablespacename);
    // 获取表空间的OID
    dst_deftablespace = get_tablespace_oid(tablespacename, false);

    /* check permissions */
    // 检查权限
    aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(), ACL_CREATE);
    if (aclresult != ACLCHECK_OK)
        aclcheck_error(aclresult, OBJECT_TABLESPACE, tablespacename);

    /* pg_global must never be the default tablespace */
    // 确保pg_global不是默认表空间
    if (dst_deftablespace == GLOBALTABLESPACE_OID)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("pg_global cannot be used as default tablespace")));

    /*
     * If we are trying to change the default tablespace of the template,
     * we require that the template not have any files in the new default
     * tablespace. This is necessary because otherwise the copied
     * database would contain pg_class rows that refer to its default
     * tablespace both explicitly (by OID) and implicitly (as zero), which
     * would cause problems. For example another CREATE DATABASE using
     * the copied database as template, and trying to change its default
     * tablespace again, would yield outright incorrect results (it would
     * improperly move tables to the new default tablespace that should
     * stay in the same tablespace).
     */
    // 如果尝试更改模板的默认表空间,要求模板在新的默认表空间中没有任何文件
    if (dst_deftablespace != src_deftablespace)
    {
        char       *srcpath;
        struct stat st;

        // 获取模板数据库在新的默认表空间中的路径
        srcpath = GetDatabasePath(src_dboid, dst_deftablespace);

        // 检查路径是否存在文件,并且不为空
        if (stat(srcpath, &st) == 0 &&
            S_ISDIR(st.st_mode) &&
            !directory_is_empty(srcpath))
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("cannot assign new default tablespace \"%s\"",
                            tablespacename),
                     errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
                               dbtemplate)));

        pfree(srcpath);
    }
}
else
{
    /* Use template database's default tablespace */
    // 使用模板数据库的默认表空间
    dst_deftablespace = src_deftablespace;

    /* Note there is no additional permission check in this path */
    // 注意在这种情况下没有额外的权限检查
}

       模板库连接检查:这里还会保证在复制模板期间没有其他连接连到模板库,避免复制过程中模板库有修改。

	/*
	 * The source DB can't have any active backends, except this one
	 * (exception is to allow CREATE DB while connected to template1).
	 * Otherwise we might copy inconsistent data.
	 *
	 * This should be last among the basic error checks, because it involves
	 * potential waiting; we may as well throw an error first if we're gonna
	 * throw one.
	 */
	if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_IN_USE),
				 errmsg("source database \"%s\" is being accessed by other users",
						dbtemplate),
				 errdetail_busy_db(notherbackends, npreparedxacts)));

3. 新数据库信息填充

为新数据库设置oid

	/*
	 * Select an OID for the new database, checking that it doesn't have a
	 * filename conflict with anything already existing in the tablespace
	 * directories.
	 */
	pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);

	do
	{
		dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
								   Anum_pg_database_oid);
	} while (check_db_file_conflict(dboid));

填充信息至pg_database

	/*
	 * Insert a new tuple into pg_database.  This establishes our ownership of
	 * the new database name (anyone else trying to insert the same name will
	 * block on the unique index, and fail after we commit).
	 */

	/* Form tuple */
	MemSet(new_record, 0, sizeof(new_record));
	MemSet(new_record_nulls, false, sizeof(new_record_nulls));

	new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
	new_record[Anum_pg_database_datname - 1] =
		DirectFunctionCall1(namein, CStringGetDatum(dbname));
	new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
	new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
	new_record[Anum_pg_database_datcollate - 1] =
		DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
	new_record[Anum_pg_database_datctype - 1] =
		DirectFunctionCall1(namein, CStringGetDatum(dbctype));
	new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
	new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
	new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
	new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
	new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
	new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
	new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);

	/*
	 * We deliberately set datacl to default (NULL), rather than copying it
	 * from the template database.  Copying it would be a bad idea when the
	 * owner is not the same as the template's owner.
	 */
	new_record_nulls[Anum_pg_database_datacl - 1] = true;

	tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
							new_record, new_record_nulls);

	CatalogTupleInsert(pg_database_rel, tuple);

4. 收尾工作

强制执行checkpoint,将所有脏页落盘

	/*
	 * Force a checkpoint before starting the copy. This will force all dirty
	 * buffers, including those of unlogged tables, out to disk, to ensure
	 * source database is up-to-date on disk for the copy.
	 * FlushDatabaseBuffers() would suffice for that, but we also want to
	 * process any pending unlink requests. Otherwise, if a checkpoint
	 * happened while we're copying files, a file might be deleted just when
	 * we're about to copy it, causing the lstat() call in copydir() to fail
	 * with ENOENT.
	 */
	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
					  | CHECKPOINT_FLUSH_ALL);

开始子目录复制后,若中途失败,应该能清理复制的目录

	/*
	 * Once we start copying subdirectories, we need to be able to clean 'em
	 * up if we fail.  Use an ENSURE block to make sure this happens.  (This
	 * is not a 100% solution, because of the possibility of failure during
	 * transaction commit after we leave this routine, but it should handle
	 * most scenarios.)
	 */
	fparms.src_dboid = src_dboid;
	fparms.dest_dboid = dboid;
	PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
							PointerGetDatum(&fparms));
	{
		/*
		 * Iterate through all tablespaces of the template database, and copy
		 * each one to the new database.
		 */
		rel = table_open(TableSpaceRelationId, AccessShareLock);
		scan = table_beginscan_catalog(rel, 0, NULL);
		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
		{
			Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
			Oid			srctablespace = spaceform->oid;
			Oid			dsttablespace;
			char	   *srcpath;
			char	   *dstpath;
			struct stat st;

			/* No need to copy global tablespace */
			if (srctablespace == GLOBALTABLESPACE_OID)
				continue;

			srcpath = GetDatabasePath(src_dboid, srctablespace);

			if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
				directory_is_empty(srcpath))
			{
				/* Assume we can ignore it */
				pfree(srcpath);
				continue;
			}

			if (srctablespace == src_deftablespace)
				dsttablespace = dst_deftablespace;
			else
				dsttablespace = srctablespace;

			dstpath = GetDatabasePath(dboid, dsttablespace);

			/*
			 * Copy this subdirectory to the new location
			 *
			 * We don't need to copy subdirectories
			 */
			copydir(srcpath, dstpath, false);

			/* Record the filesystem change in XLOG */
			{
				xl_dbase_create_rec xlrec;

				xlrec.db_id = dboid;
				xlrec.tablespace_id = dsttablespace;
				xlrec.src_db_id = src_dboid;
				xlrec.src_tablespace_id = srctablespace;

				XLogBeginInsert();
				XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));

				(void) XLogInsert(RM_DBASE_ID,
								  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
			}
		}
		table_endscan(scan);
		table_close(rel, AccessShareLock);

		/*
		 * We force a checkpoint before committing.  This effectively means
		 * that committed XLOG_DBASE_CREATE operations will never need to be
		 * replayed (at least not in ordinary crash recovery; we still have to
		 * make the XLOG entry for the benefit of PITR operations). This
		 * avoids two nasty scenarios:
		 *
		 * #1: When PITR is off, we don't XLOG the contents of newly created
		 * indexes; therefore the drop-and-recreate-whole-directory behavior
		 * of DBASE_CREATE replay would lose such indexes.
		 *
		 * #2: Since we have to recopy the source database during DBASE_CREATE
		 * replay, we run the risk of copying changes in it that were
		 * committed after the original CREATE DATABASE command but before the
		 * system crash that led to the replay.  This is at least unexpected
		 * and at worst could lead to inconsistencies, eg duplicate table
		 * names.
		 *
		 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
		 *
		 * In PITR replay, the first of these isn't an issue, and the second
		 * is only a risk if the CREATE DATABASE and subsequent template
		 * database change both occur while a base backup is being taken.
		 * There doesn't seem to be much we can do about that except document
		 * it as a limitation.
		 *
		 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
		 * we can avoid this.
		 */
		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);

		/*
		 * Close pg_database, but keep lock till commit.
		 */
		table_close(pg_database_rel, NoLock);

		/*
		 * Force synchronous commit, thus minimizing the window between
		 * creation of the database files and committal of the transaction. If
		 * we crash before committing, we'll have a DB that's taking up disk
		 * space but is not in pg_database, which is not good.
		 */
		ForceSyncCommit();
	}
	PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
								PointerGetDatum(&fparms));

	return dboid;
}

最后返回新建db的oid

参考

https://www.postgresql.org/docs/current/sql-createdatabase.html

PostgreSQL Tips: Template Databases - IBM Blog

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐