1/*-------------------------------------------------------------------------
2 *
3 * lockcmds.c
4 * LOCK command support code
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/lockcmds.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include "access/table.h"
18#include "access/xact.h"
19#include "catalog/namespace.h"
20#include "catalog/pg_inherits.h"
21#include "commands/lockcmds.h"
22#include "miscadmin.h"
23#include "parser/parse_clause.h"
24#include "storage/lmgr.h"
25#include "utils/acl.h"
26#include "utils/lsyscache.h"
27#include "utils/syscache.h"
28#include "rewrite/rewriteHandler.h"
29#include "nodes/nodeFuncs.h"
30
31static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid);
32static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid);
33static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
34 Oid oldrelid, void *arg);
35static void LockViewRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, List *ancestor_views);
36
37/*
38 * LOCK TABLE
39 */
40void
41LockTableCommand(LockStmt *lockstmt)
42{
43 ListCell *p;
44
45 /*---------
46 * During recovery we only accept these variations:
47 * LOCK TABLE foo IN ACCESS SHARE MODE
48 * LOCK TABLE foo IN ROW SHARE MODE
49 * LOCK TABLE foo IN ROW EXCLUSIVE MODE
50 * This test must match the restrictions defined in LockAcquireExtended()
51 *---------
52 */
53 if (lockstmt->mode > RowExclusiveLock)
54 PreventCommandDuringRecovery("LOCK TABLE");
55
56 /*
57 * Iterate over the list and process the named relations one at a time
58 */
59 foreach(p, lockstmt->relations)
60 {
61 RangeVar *rv = (RangeVar *) lfirst(p);
62 bool recurse = rv->inh;
63 Oid reloid;
64
65 reloid = RangeVarGetRelidExtended(rv, lockstmt->mode,
66 lockstmt->nowait ? RVR_NOWAIT : 0,
67 RangeVarCallbackForLockTable,
68 (void *) &lockstmt->mode);
69
70 if (get_rel_relkind(reloid) == RELKIND_VIEW)
71 LockViewRecurse(reloid, lockstmt->mode, lockstmt->nowait, NIL);
72 else if (recurse)
73 LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId());
74 }
75}
76
77/*
78 * Before acquiring a table lock on the named table, check whether we have
79 * permission to do so.
80 */
81static void
82RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
83 void *arg)
84{
85 LOCKMODE lockmode = *(LOCKMODE *) arg;
86 char relkind;
87 char relpersistence;
88 AclResult aclresult;
89
90 if (!OidIsValid(relid))
91 return; /* doesn't exist, so no permissions check */
92 relkind = get_rel_relkind(relid);
93 if (!relkind)
94 return; /* woops, concurrently dropped; no permissions
95 * check */
96
97 /* Currently, we only allow plain tables or views to be locked */
98 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
99 relkind != RELKIND_VIEW)
100 ereport(ERROR,
101 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
102 errmsg("\"%s\" is not a table or view",
103 rv->relname)));
104
105 /*
106 * Make note if a temporary relation has been accessed in this
107 * transaction.
108 */
109 relpersistence = get_rel_persistence(relid);
110 if (relpersistence == RELPERSISTENCE_TEMP)
111 MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE;
112
113 /* Check permissions. */
114 aclresult = LockTableAclCheck(relid, lockmode, GetUserId());
115 if (aclresult != ACLCHECK_OK)
116 aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
117}
118
119/*
120 * Apply LOCK TABLE recursively over an inheritance tree
121 *
122 * We use find_inheritance_children not find_all_inheritors to avoid taking
123 * locks far in advance of checking privileges. This means we'll visit
124 * multiply-inheriting children more than once, but that's no problem.
125 */
126static void
127LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid)
128{
129 List *children;
130 ListCell *lc;
131
132 children = find_inheritance_children(reloid, NoLock);
133
134 foreach(lc, children)
135 {
136 Oid childreloid = lfirst_oid(lc);
137 AclResult aclresult;
138
139 /* Check permissions before acquiring the lock. */
140 aclresult = LockTableAclCheck(childreloid, lockmode, userid);
141 if (aclresult != ACLCHECK_OK)
142 {
143 char *relname = get_rel_name(childreloid);
144
145 if (!relname)
146 continue; /* child concurrently dropped, just skip it */
147 aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(childreloid)), relname);
148 }
149
150 /* We have enough rights to lock the relation; do so. */
151 if (!nowait)
152 LockRelationOid(childreloid, lockmode);
153 else if (!ConditionalLockRelationOid(childreloid, lockmode))
154 {
155 /* try to throw error by name; relation could be deleted... */
156 char *relname = get_rel_name(childreloid);
157
158 if (!relname)
159 continue; /* child concurrently dropped, just skip it */
160 ereport(ERROR,
161 (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
162 errmsg("could not obtain lock on relation \"%s\"",
163 relname)));
164 }
165
166 /*
167 * Even if we got the lock, child might have been concurrently
168 * dropped. If so, we can skip it.
169 */
170 if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid)))
171 {
172 /* Release useless lock */
173 UnlockRelationOid(childreloid, lockmode);
174 continue;
175 }
176
177 LockTableRecurse(childreloid, lockmode, nowait, userid);
178 }
179}
180
181/*
182 * Apply LOCK TABLE recursively over a view
183 *
184 * All tables and views appearing in the view definition query are locked
185 * recursively with the same lock mode.
186 */
187
188typedef struct
189{
190 LOCKMODE lockmode; /* lock mode to use */
191 bool nowait; /* no wait mode */
192 Oid viewowner; /* view owner for checking the privilege */
193 Oid viewoid; /* OID of the view to be locked */
194 List *ancestor_views; /* OIDs of ancestor views */
195} LockViewRecurse_context;
196
197static bool
198LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
199{
200 if (node == NULL)
201 return false;
202
203 if (IsA(node, Query))
204 {
205 Query *query = (Query *) node;
206 ListCell *rtable;
207
208 foreach(rtable, query->rtable)
209 {
210 RangeTblEntry *rte = lfirst(rtable);
211 AclResult aclresult;
212
213 Oid relid = rte->relid;
214 char relkind = rte->relkind;
215 char *relname = get_rel_name(relid);
216
217 /*
218 * The OLD and NEW placeholder entries in the view's rtable are
219 * skipped.
220 */
221 if (relid == context->viewoid &&
222 (strcmp(rte->eref->aliasname, "old") == 0 ||
223 strcmp(rte->eref->aliasname, "new") == 0))
224 continue;
225
226 /* Currently, we only allow plain tables or views to be locked. */
227 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
228 relkind != RELKIND_VIEW)
229 continue;
230
231 /* Check infinite recursion in the view definition. */
232 if (list_member_oid(context->ancestor_views, relid))
233 ereport(ERROR,
234 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
235 errmsg("infinite recursion detected in rules for relation \"%s\"",
236 get_rel_name(relid))));
237
238 /* Check permissions with the view owner's privilege. */
239 aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner);
240 if (aclresult != ACLCHECK_OK)
241 aclcheck_error(aclresult, get_relkind_objtype(relkind), relname);
242
243 /* We have enough rights to lock the relation; do so. */
244 if (!context->nowait)
245 LockRelationOid(relid, context->lockmode);
246 else if (!ConditionalLockRelationOid(relid, context->lockmode))
247 ereport(ERROR,
248 (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
249 errmsg("could not obtain lock on relation \"%s\"",
250 relname)));
251
252 if (relkind == RELKIND_VIEW)
253 LockViewRecurse(relid, context->lockmode, context->nowait, context->ancestor_views);
254 else if (rte->inh)
255 LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner);
256 }
257
258 return query_tree_walker(query,
259 LockViewRecurse_walker,
260 context,
261 QTW_IGNORE_JOINALIASES);
262 }
263
264 return expression_tree_walker(node,
265 LockViewRecurse_walker,
266 context);
267}
268
269static void
270LockViewRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, List *ancestor_views)
271{
272 LockViewRecurse_context context;
273
274 Relation view;
275 Query *viewquery;
276
277 view = table_open(reloid, NoLock);
278 viewquery = get_view_query(view);
279
280 context.lockmode = lockmode;
281 context.nowait = nowait;
282 context.viewowner = view->rd_rel->relowner;
283 context.viewoid = reloid;
284 context.ancestor_views = lcons_oid(reloid, ancestor_views);
285
286 LockViewRecurse_walker((Node *) viewquery, &context);
287
288 ancestor_views = list_delete_oid(ancestor_views, reloid);
289
290 table_close(view, NoLock);
291}
292
293/*
294 * Check whether the current user is permitted to lock this relation.
295 */
296static AclResult
297LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
298{
299 AclResult aclresult;
300 AclMode aclmask;
301
302 /* Verify adequate privilege */
303 if (lockmode == AccessShareLock)
304 aclmask = ACL_SELECT;
305 else if (lockmode == RowExclusiveLock)
306 aclmask = ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE;
307 else
308 aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE;
309
310 aclresult = pg_class_aclcheck(reloid, userid, aclmask);
311
312 return aclresult;
313}
314