12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202// SPDX-License-Identifier: GPL-2.0
/*
 * GPL HEADER START
 *
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 only,
 * as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License version 2 for more details (a copy is included
 * in the LICENSE file that accompanied this code).
 *
 * You should have received a copy of the GNU General Public License
 * version 2 along with this program; If not, see
 * http://www.gnu.org/licenses/gpl-2.0.html
 *
 * GPL HEADER END
 */
/*
 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
 * Use is subject to license terms.
 *
 * Copyright (c) 2011, 2015, Intel Corporation.
 */
/*
 * This file is part of Lustre, http://www.lustre.org/
 * Lustre is a trademark of Sun Microsystems, Inc.
 */

#define DEBUG_SUBSYSTEM S_MDC

#include <linux/module.h>

#include <lustre_intent.h>
#include <obd.h>
#include <obd_class.h>
#include <lustre_dlm.h>
#include <lustre_fid.h>
#include <lustre_mdc.h>
#include <lustre_net.h>
#include <lustre_req_layout.h>
#include <lustre_swab.h>

#include "mdc_internal.h"

struct mdc_getattr_args {
	struct obd_export	   *ga_exp;
	struct md_enqueue_info      *ga_minfo;
};

int it_open_error(int phase, struct lookup_intent *it)
{
	if (it_disposition(it, DISP_OPEN_LEASE)) {
		if (phase >= DISP_OPEN_LEASE)
			return it->it_status;
		else
			return 0;
	}
	if (it_disposition(it, DISP_OPEN_OPEN)) {
		if (phase >= DISP_OPEN_OPEN)
			return it->it_status;
		else
			return 0;
	}

	if (it_disposition(it, DISP_OPEN_CREATE)) {
		if (phase >= DISP_OPEN_CREATE)
			return it->it_status;
		else
			return 0;
	}

	if (it_disposition(it, DISP_LOOKUP_EXECD)) {
		if (phase >= DISP_LOOKUP_EXECD)
			return it->it_status;
		else
			return 0;
	}

	if (it_disposition(it, DISP_IT_EXECD)) {
		if (phase >= DISP_IT_EXECD)
			return it->it_status;
		else
			return 0;
	}
	CERROR("it disp: %X, status: %d\n", it->it_disposition,
	       it->it_status);
	LBUG();
	return 0;
}
EXPORT_SYMBOL(it_open_error);

/* this must be called on a lockh that is known to have a referenced lock */
int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
		      void *data, __u64 *bits)
{
	struct ldlm_lock *lock;
	struct inode *new_inode = data;

	if (bits)
		*bits = 0;

	if (!lustre_handle_is_used(lockh))
		return 0;

	lock = ldlm_handle2lock(lockh);

	LASSERT(lock);
	lock_res_and_lock(lock);
	if (lock->l_resource->lr_lvb_inode &&
	    lock->l_resource->lr_lvb_inode != data) {
		struct inode *old_inode = lock->l_resource->lr_lvb_inode;

		LASSERTF(old_inode->i_state & I_FREEING,
			 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
			 old_inode, old_inode->i_ino, old_inode->i_generation,
			 old_inode->i_state, new_inode, new_inode->i_ino,
			 new_inode->i_generation);
	}
	lock->l_resource->lr_lvb_inode = new_inode;
	if (bits)
		*bits = lock->l_policy_data.l_inodebits.bits;

	unlock_res_and_lock(lock);
	LDLM_LOCK_PUT(lock);

	return 0;
}

enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
			      const struct lu_fid *fid, enum ldlm_type type,
			      union ldlm_policy_data *policy,
			      enum ldlm_mode mode,
			      struct lustre_handle *lockh)
{
	struct ldlm_res_id res_id;
	enum ldlm_mode rc;

	fid_build_reg_res_name(fid, &res_id);
	/* LU-4405: Clear bits not supported by server */
	policy->l_inodebits.bits &= exp_connect_ibits(exp);
	rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
			     &res_id, type, policy, mode, lockh, 0);
	return rc;
}

int mdc_cancel_unused(struct obd_export *exp,
		      const struct lu_fid *fid,
		      union ldlm_policy_data *policy,
		      enum ldlm_mode mode,
		      enum ldlm_cancel_flags flags,
		      void *opaque)
{
	struct ldlm_res_id res_id;
	struct obd_device *obd = class_exp2obd(exp);
	int rc;

	fid_build_reg_res_name(fid, &res_id);
	rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
					     policy, mode, flags, opaque);
	return rc;
}

int mdc_null_inode(struct obd_export *exp,
		   const struct lu_fid *fid)
{
	struct ldlm_res_id res_id;
	struct ldlm_resource *res;
	struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;

	LASSERTF(ns, "no namespace passed\n");

	fid_build_reg_res_name(fid, &res_id);

	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
	if (IS_ERR(res))
		return 0;

	lock_res(res);
	res->lr_lvb_inode = NULL;
	unlock_res(res);

	ldlm_resource_putref(res);
	return 0;
}

static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
{
	/* Don't hold error requests for replay. */
	if (req->rq_replay) {
		spin_lock(&req->rq_lock);
		req->rq_replay = 0;
		spin_unlock(&req->rq_lock);
	}
	if (rc && req->rq_transno != 0) {
		DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
		LBUG();
	}
}

/* Save a large LOV EA into the request buffer so that it is available
 * for replay.  We don't do this in the initial request because the
 * original request doesn't need this buffer (at most it sends just the
 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
 * buffer and may also be difficult to allocate and save a very large
 * request buffer for each open. (bug 5707)
 *
 * OOM here may cause recovery failure if lmm is needed (only for the
 * original open if the MDS crashed just when this client also OOM'd)
 * but this is incredibly unlikely, and questionable whether the client
 * could do MDS recovery under OOM anyways...
 */
static void mdc_realloc_openmsg(struct ptlrpc_request *req,
				struct mdt_body *body)
{
	int     rc;

	/* FIXME: remove this explicit offset. */
	rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
					body->mbo_eadatasize);
	if (rc) {
		CERROR("Can't enlarge segment %d size to %d\n",
		       DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
		body->mbo_valid &= ~OBD_MD_FLEASIZE;
		body->mbo_eadatasize = 0;
	}
}

static struct ptlrpc_request *
mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
		     struct md_op_data *op_data)
{
	struct ptlrpc_request *req;
	struct obd_device     *obddev = class_exp2obd(exp);
	struct ldlm_intent    *lit;
	const void *lmm = op_data->op_data;
	u32 lmmsize = op_data->op_data_size;
	LIST_HEAD(cancels);
	int		    count = 0;
	int		    mode;
	int		    rc;

	it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;

	/* XXX: openlock is not cancelled for cross-refs. */
	/* If inode is known, cancel conflicting OPEN locks. */
	if (fid_is_sane(&op_data->op_fid2)) {
		if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
			if (it->it_flags & FMODE_WRITE)
				mode = LCK_EX;
			else
				mode = LCK_PR;
		} else {
			if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
				mode = LCK_CW;
			else if (it->it_flags & __FMODE_EXEC)
				mode = LCK_PR;
			else
				mode = LCK_CR;
		}
		count = mdc_resource_get_unused(exp, &op_data->op_fid2,
						&cancels, mode,
						MDS_INODELOCK_OPEN);
	}

	/* If CREATE, cancel parent's UPDATE lock. */
	if (it->it_op & IT_CREAT)
		mode = LCK_EX;
	else
		mode = LCK_CR;
	count += mdc_resource_get_unused(exp, &op_data->op_fid1,
					 &cancels, mode,
					 MDS_INODELOCK_UPDATE);

	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
				   &RQF_LDLM_INTENT_OPEN);
	if (!req) {
		ldlm_lock_list_put(&cancels, l_bl_ast, count);
		return ERR_PTR(-ENOMEM);
	}

	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
			     op_data->op_namelen + 1);
	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
			     max(lmmsize, obddev->u.cli.cl_default_mds_easize));

	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
	if (rc < 0) {
		ptlrpc_request_free(req);
		return ERR_PTR(rc);
	}

	spin_lock(&req->rq_lock);
	req->rq_replay = req->rq_import->imp_replayable;
	spin_unlock(&req->rq_lock);

	/* pack the intent */
	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
	lit->opc = (__u64)it->it_op;

	/* pack the intended request */
	mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
		      lmmsize);

	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
			     obddev->u.cli.cl_max_mds_easize);

	ptlrpc_request_set_replen(req);
	return req;
}

static struct ptlrpc_request *
mdc_intent_getxattr_pack(struct obd_export *exp,
			 struct lookup_intent *it,
			 struct md_op_data *op_data)
{
	struct ptlrpc_request	*req;
	struct ldlm_intent	*lit;
	int rc, count = 0;
	u32 maxdata;
	LIST_HEAD(cancels);

	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
				   &RQF_LDLM_INTENT_GETXATTR);
	if (!req)
		return ERR_PTR(-ENOMEM);

	rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
	if (rc) {
		ptlrpc_request_free(req);
		return ERR_PTR(rc);
	}

	/* pack the intent */
	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
	lit->opc = IT_GETXATTR;

	maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;

	/* pack the intended request */
	mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
		      0);

	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);

	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);

	req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
			     RCL_SERVER, maxdata);

	ptlrpc_request_set_replen(req);

	return req;
}

static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
						     struct lookup_intent *it,
						     struct md_op_data *op_data)
{
	struct ptlrpc_request *req;
	struct obd_device     *obddev = class_exp2obd(exp);
	struct ldlm_intent    *lit;
	int		    rc;

	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
				   &RQF_LDLM_INTENT_UNLINK);
	if (!req)
		return ERR_PTR(-ENOMEM);

	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
			     op_data->op_namelen + 1);

	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
	if (rc) {
		ptlrpc_request_free(req);
		return ERR_PTR(rc);
	}

	/* pack the intent */
	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
	lit->opc = (__u64)it->it_op;

	/* pack the intended request */
	mdc_unlink_pack(req, op_data);

	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
			     obddev->u.cli.cl_default_mds_easize);
	ptlrpc_request_set_replen(req);
	return req;
}

static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
						      struct lookup_intent *it,
						     struct md_op_data *op_data)
{
	struct ptlrpc_request *req;
	struct obd_device     *obddev = class_exp2obd(exp);
	u64		       valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
				       OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
				       OBD_MD_MEA | OBD_MD_FLACL;
	struct ldlm_intent    *lit;
	int		    rc;
	u32 easize;

	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
				   &RQF_LDLM_INTENT_GETATTR);
	if (!req)
		return ERR_PTR(-ENOMEM);

	req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
			     op_data->op_namelen + 1);

	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
	if (rc) {
		ptlrpc_request_free(req);
		return ERR_PTR(rc);
	}

	/* pack the intent */
	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
	lit->opc = (__u64)it->it_op;

	if (obddev->u.cli.cl_default_mds_easize > 0)
		easize = obddev->u.cli.cl_default_mds_easize;
	else
		easize = obddev->u.cli.cl_max_mds_easize;

	/* pack the intended request */
	mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);

	req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
	ptlrpc_request_set_replen(req);
	return req;
}

static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
						     struct lookup_intent *it,
						     struct md_op_data *unused)
{
	struct obd_device     *obd = class_exp2obd(exp);
	struct ptlrpc_request *req;
	struct ldlm_intent    *lit;
	struct layout_intent  *layout;
	int rc;

	req = ptlrpc_request_alloc(class_exp2cliimp(exp),
				   &RQF_LDLM_INTENT_LAYOUT);
	if (!req)
		return ERR_PTR(-ENOMEM);

	req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
	if (rc) {
		ptlrpc_request_free(req);
		return ERR_PTR(rc);
	}

	/* pack the intent */
	lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
	lit->opc = (__u64)it->it_op;

	/* pack the layout intent request */
	layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
	/* LAYOUT_INTENT_ACCESS is generic, specific operation will be
	 * set for replication
	 */
	layout->li_opc = LAYOUT_INTENT_ACCESS;

	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
			     obd->u.cli.cl_default_mds_easize);
	ptlrpc_request_set_replen(req);
	return req;
}

static struct ptlrpc_request *
mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
{
	struct ptlrpc_request *req;
	int rc;

	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
	if (!req)
		return ERR_PTR(-ENOMEM);

	rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
	if (rc) {
		ptlrpc_request_free(req);
		return ERR_PTR(rc);
	}

	req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
	ptlrpc_request_set_replen(req);
	return req;
}

static int mdc_finish_enqueue(struct obd_export *exp,
			      struct ptlrpc_request *req,
			      struct ldlm_enqueue_info *einfo,
			      struct lookup_intent *it,
			      struct lustre_handle *lockh,
			      int rc)
{
	struct req_capsule  *pill = &req->rq_pill;
	struct ldlm_request *lockreq;
	struct ldlm_reply   *lockrep;
	struct ldlm_lock    *lock;
	void		*lvb_data = NULL;
	u32 lvb_len = 0;

	LASSERT(rc >= 0);
	/* Similarly, if we're going to replay this request, we don't want to
	 * actually get a lock, just perform the intent.
	 */
	if (req->rq_transno || req->rq_replay) {
		lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
		lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
	}

	if (rc == ELDLM_LOCK_ABORTED) {
		einfo->ei_mode = 0;
		memset(lockh, 0, sizeof(*lockh));
		rc = 0;
	} else { /* rc = 0 */
		lock = ldlm_handle2lock(lockh);

		/* If the server gave us back a different lock mode, we should
		 * fix up our variables.
		 */
		if (lock->l_req_mode != einfo->ei_mode) {
			ldlm_lock_addref(lockh, lock->l_req_mode);
			ldlm_lock_decref(lockh, einfo->ei_mode);
			einfo->ei_mode = lock->l_req_mode;
		}
		LDLM_LOCK_PUT(lock);
	}

	lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);

	it->it_disposition = (int)lockrep->lock_policy_res1;
	it->it_status = (int)lockrep->lock_policy_res2;
	it->it_lock_mode = einfo->ei_mode;
	it->it_lock_handle = lockh->cookie;
	it->it_request = req;

	/* Technically speaking rq_transno must already be zero if
	 * it_status is in error, so the check is a bit redundant
	 */
	if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
		mdc_clear_replay_flag(req, it->it_status);

	/* If we're doing an IT_OPEN which did not result in an actual
	 * successful open, then we need to remove the bit which saves
	 * this request for unconditional replay.
	 *
	 * It's important that we do this first!  Otherwise we might exit the
	 * function without doing so, and try to replay a failed create
	 * (bug 3440)
	 */
	if (it->it_op & IT_OPEN && req->rq_replay &&
	    (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
		mdc_clear_replay_flag(req, it->it_status);

	DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
		  it->it_op, it->it_disposition, it->it_status);

	/* We know what to expect, so we do any byte flipping required here */
	if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
		struct mdt_body *body;

		body = req_capsule_server_get(pill, &RMF_MDT_BODY);
		if (!body) {
			CERROR("Can't swab mdt_body\n");
			return -EPROTO;
		}

		if (it_disposition(it, DISP_OPEN_OPEN) &&
		    !it_open_error(DISP_OPEN_OPEN, it)) {
			/*
			 * If this is a successful OPEN request, we need to set
			 * replay handler and data early, so that if replay
			 * happens immediately after swabbing below, new reply
			 * is swabbed by that handler correctly.
			 */
			mdc_set_open_replay_data(NULL, NULL, it);
		}

		if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
			void *eadata;

			mdc_update_max_ea_from_body(exp, body);

			/*
			 * The eadata is opaque; just check that it is there.
			 * Eventually, obd_unpackmd() will check the contents.
			 */
			eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
							      body->mbo_eadatasize);
			if (!eadata)
				return -EPROTO;

			/* save lvb data and length in case this is for layout
			 * lock
			 */
			lvb_data = eadata;
			lvb_len = body->mbo_eadatasize;

			/*
			 * We save the reply LOV EA in case we have to replay a
			 * create for recovery.  If we didn't allocate a large
			 * enough request buffer above we need to reallocate it
			 * here to hold the actual LOV EA.
			 *
			 * To not save LOV EA if request is not going to replay
			 * (for example error one).
			 */
			if ((it->it_op & IT_OPEN) && req->rq_replay) {
				void *lmm;

				if (req_capsule_get_size(pill, &RMF_EADATA,
							 RCL_CLIENT) <
				    body->mbo_eadatasize)
					mdc_realloc_openmsg(req, body);
				else
					req_capsule_shrink(pill, &RMF_EADATA,
							   body->mbo_eadatasize,
							   RCL_CLIENT);

				req_capsule_set_size(pill, &RMF_EADATA,
						     RCL_CLIENT,
						     body->mbo_eadatasize);

				lmm = req_capsule_client_get(pill, &RMF_EADATA);
				if (lmm)
					memcpy(lmm, eadata, body->mbo_eadatasize);
			}
		}
	} else if (it->it_op & IT_LAYOUT) {
		/* maybe the lock was granted right away and layout
		 * is packed into RMF_DLM_LVB of req
		 */
		lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
		if (lvb_len > 0) {
			lvb_data = req_capsule_server_sized_get(pill,
								&RMF_DLM_LVB,
								lvb_len);
			if (!lvb_data)
				return -EPROTO;
		}
	}

	/* fill in stripe data for layout lock */
	lock = ldlm_handle2lock(lockh);
	if (lock && ldlm_has_layout(lock) && lvb_data) {
		void *lmm;

		LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
			   ldlm_it2str(it->it_op), lvb_len);

		lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
		if (!lmm) {
			LDLM_LOCK_PUT(lock);
			return -ENOMEM;
		}
		memcpy(lmm, lvb_data, lvb_len);

		/* install lvb_data */
		lock_res_and_lock(lock);
		if (!lock->l_lvb_data) {
			lock->l_lvb_type = LVB_T_LAYOUT;
			lock->l_lvb_data = lmm;
			lock->l_lvb_len = lvb_len;
			lmm = NULL;
		}
		unlock_res_and_lock(lock);
		if (lmm)
			kvfree(lmm);
	}
	if (lock)
		LDLM_LOCK_PUT(lock);

	return rc;
}

/* We always reserve enough space in the reply packet for a stripe MD, because
 * we don't know in advance the file type.
 */
int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
		const union ldlm_policy_data *policy,
		struct lookup_intent *it, struct md_op_data *op_data,
		struct lustre_handle *lockh, u64 extra_lock_flags)
{
	static const union ldlm_policy_data lookup_policy = {
		.l_inodebits = { MDS_INODELOCK_LOOKUP }
	};
	static const union ldlm_policy_data update_policy = {
		.l_inodebits = { MDS_INODELOCK_UPDATE }
	};
	static const union ldlm_policy_data layout_policy = {
		.l_inodebits = { MDS_INODELOCK_LAYOUT }
	};
	static const union ldlm_policy_data getxattr_policy = {
		.l_inodebits = { MDS_INODELOCK_XATTR }
	};
	struct obd_device *obddev = class_exp2obd(exp);
	struct ptlrpc_request *req = NULL;
	u64 flags, saved_flags = extra_lock_flags;
	struct ldlm_res_id res_id;
	int generation, resends = 0;
	struct ldlm_reply *lockrep;
	enum lvb_type lvb_type = LVB_T_NONE;
	int rc;

	LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
		 einfo->ei_type);
	fid_build_reg_res_name(&op_data->op_fid1, &res_id);

	if (it) {
		LASSERT(!policy);

		saved_flags |= LDLM_FL_HAS_INTENT;
		if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
			policy = &update_policy;
		else if (it->it_op & IT_LAYOUT)
			policy = &layout_policy;
		else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
			policy = &getxattr_policy;
		else
			policy = &lookup_policy;
	}

	generation = obddev->u.cli.cl_import->imp_generation;
resend:
	flags = saved_flags;
	if (!it) {
		/* The only way right now is FLOCK. */
		LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
			 einfo->ei_type);
		res_id.name[3] = LDLM_FLOCK;
	} else if (it->it_op & IT_OPEN) {
		req = mdc_intent_open_pack(exp, it, op_data);
	} else if (it->it_op & IT_UNLINK) {
		req = mdc_intent_unlink_pack(exp, it, op_data);
	} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
		req = mdc_intent_getattr_pack(exp, it, op_data);
	} else if (it->it_op & IT_READDIR) {
		req = mdc_enqueue_pack(exp, 0);
	} else if (it->it_op & IT_LAYOUT) {
		if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
			return -EOPNOTSUPP;
		req = mdc_intent_layout_pack(exp, it, op_data);
		lvb_type = LVB_T_LAYOUT;
	} else if (it->it_op & IT_GETXATTR) {
		req = mdc_intent_getxattr_pack(exp, it, op_data);
	} else {
		LBUG();
		return -EINVAL;
	}

	if (IS_ERR(req))
		return PTR_ERR(req);

	if (resends) {
		req->rq_generation_set = 1;
		req->rq_import_generation = generation;
		req->rq_sent = ktime_get_real_seconds() + resends;
	}

	/* It is important to obtain modify RPC slot first (if applicable), so
	 * that threads that are waiting for a modify RPC slot are not polluting
	 * our rpcs in flight counter.
	 * We do not do flock request limiting, though
	 */
	if (it) {
		mdc_get_mod_rpc_slot(req, it);
		rc = obd_get_request_slot(&obddev->u.cli);
		if (rc != 0) {
			mdc_put_mod_rpc_slot(req, it);
			mdc_clear_replay_flag(req, 0);
			ptlrpc_req_finished(req);
			return rc;
		}
	}

	rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
			      0, lvb_type, lockh, 0);
	if (!it) {
		/* For flock requests we immediately return without further
		 * delay and let caller deal with the rest, since rest of
		 * this function metadata processing makes no sense for flock
		 * requests anyway. But in case of problem during comms with
		 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
		 * can not rely on caller and this mainly for F_UNLCKs
		 * (explicits or automatically generated by Kernel to clean
		 * current FLocks upon exit) that can't be trashed
		 */
		if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
		    (einfo->ei_type == LDLM_FLOCK) &&
		    (einfo->ei_mode == LCK_NL))
			goto resend;
		return rc;
	}

	obd_put_request_slot(&obddev->u.cli);
	mdc_put_mod_rpc_slot(req, it);

	if (rc < 0) {
		CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
		       obddev->obd_name, rc);

		mdc_clear_replay_flag(req, rc);
		ptlrpc_req_finished(req);
		return rc;
	}

	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);

	lockrep->lock_policy_res2 =
		ptlrpc_status_ntoh(lockrep->lock_policy_res2);

	/*
	 * Retry infinitely when the server returns -EINPROGRESS for the
	 * intent operation, when server returns -EINPROGRESS for acquiring
	 * intent lock, we'll retry in after_reply().
	 */
	if (it->it_op && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
		mdc_clear_replay_flag(req, rc);
		ptlrpc_req_finished(req);
		resends++;

		CDEBUG(D_HA, "%s: resend:%d op:%d " DFID "/" DFID "\n",
		       obddev->obd_name, resends, it->it_op,
		       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));

		if (generation == obddev->u.cli.cl_import->imp_generation) {
			goto resend;
		} else {
			CDEBUG(D_HA, "resend cross eviction\n");
			return -EIO;
		}
	}

	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
	if (rc < 0) {
		if (lustre_handle_is_used(lockh)) {
			ldlm_lock_decref(lockh, einfo->ei_mode);
			memset(lockh, 0, sizeof(*lockh));
		}
		ptlrpc_req_finished(req);

		it->it_lock_handle = 0;
		it->it_lock_mode = 0;
		it->it_request = NULL;
	}

	return rc;
}

static int mdc_finish_intent_lock(struct obd_export *exp,
				  struct ptlrpc_request *request,
				  struct md_op_data *op_data,
				  struct lookup_intent *it,
				  struct lustre_handle *lockh)
{
	struct lustre_handle old_lock;
	struct mdt_body *mdt_body;
	struct ldlm_lock *lock;
	int rc;

	LASSERT(request != LP_POISON);
	LASSERT(request->rq_repmsg != LP_POISON);

	if (it->it_op & IT_READDIR)
		return 0;

	if (!it_disposition(it, DISP_IT_EXECD)) {
		/* The server failed before it even started executing the
		 * intent, i.e. because it couldn't unpack the request.
		 */
		LASSERT(it->it_status != 0);
		return it->it_status;
	}
	rc = it_open_error(DISP_IT_EXECD, it);
	if (rc)
		return rc;

	mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
	LASSERT(mdt_body);      /* mdc_enqueue checked */

	rc = it_open_error(DISP_LOOKUP_EXECD, it);
	if (rc)
		return rc;

	/* keep requests around for the multiple phases of the call
	 * this shows the DISP_XX must guarantee we make it into the call
	 */
	if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
	    it_disposition(it, DISP_OPEN_CREATE) &&
	    !it_open_error(DISP_OPEN_CREATE, it)) {
		it_set_disposition(it, DISP_ENQ_CREATE_REF);
		ptlrpc_request_addref(request); /* balanced in ll_create_node */
	}
	if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
	    it_disposition(it, DISP_OPEN_OPEN) &&
	    !it_open_error(DISP_OPEN_OPEN, it)) {
		it_set_disposition(it, DISP_ENQ_OPEN_REF);
		ptlrpc_request_addref(request); /* balanced in ll_file_open */
		/* BUG 11546 - eviction in the middle of open rpc processing */
		OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
	}

	if (it->it_op & IT_CREAT)
		/* XXX this belongs in ll_create_it */
		;
	else if (it->it_op == IT_OPEN)
		LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
	else
		LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));

	/* If we already have a matching lock, then cancel the new
	 * one.  We have to set the data here instead of in
	 * mdc_enqueue, because we need to use the child's inode as
	 * the l_ast_data to match, and that's not available until
	 * intent_finish has performed the iget().)
	 */
	lock = ldlm_handle2lock(lockh);
	if (lock) {
		union ldlm_policy_data policy = lock->l_policy_data;

		LDLM_DEBUG(lock, "matching against this");

		LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
					 &lock->l_resource->lr_name),
			 "Lock res_id: " DLDLMRES ", fid: " DFID "\n",
			 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
		LDLM_LOCK_PUT(lock);

		memcpy(&old_lock, lockh, sizeof(*lockh));
		if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
				    LDLM_IBITS, &policy, LCK_NL,
				    &old_lock, 0)) {
			ldlm_lock_decref_and_cancel(lockh,
						    it->it_lock_mode);
			memcpy(lockh, &old_lock, sizeof(old_lock));
			it->it_lock_handle = lockh->cookie;
		}
	}
	CDEBUG(D_DENTRY,
	       "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
	       (int)op_data->op_namelen, op_data->op_name,
	       ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
	return rc;
}

int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
			struct lu_fid *fid, __u64 *bits)
{
	/* We could just return 1 immediately, but since we should only
	 * be called in revalidate_it if we already have a lock, let's
	 * verify that.
	 */
	struct ldlm_res_id res_id;
	struct lustre_handle lockh;
	union ldlm_policy_data policy;
	enum ldlm_mode mode;

	if (it->it_lock_handle) {
		lockh.cookie = it->it_lock_handle;
		mode = ldlm_revalidate_lock_handle(&lockh, bits);
	} else {
		fid_build_reg_res_name(fid, &res_id);
		switch (it->it_op) {
		case IT_GETATTR:
			/* File attributes are held under multiple bits:
			 * nlink is under lookup lock, size and times are
			 * under UPDATE lock and recently we've also got
			 * a separate permissions lock for owner/group/acl that
			 * were protected by lookup lock before.
			 * Getattr must provide all of that information,
			 * so we need to ensure we have all of those locks.
			 * Unfortunately, if the bits are split across multiple
			 * locks, there's no easy way to match all of them here,
			 * so an extra RPC would be performed to fetch all
			 * of those bits at once for now.
			 */
			/* For new MDTs(> 2.4), UPDATE|PERM should be enough,
			 * but for old MDTs (< 2.4), permission is covered
			 * by LOOKUP lock, so it needs to match all bits here.
			 */
			policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
						  MDS_INODELOCK_LOOKUP |
						  MDS_INODELOCK_PERM;
			break;
		case IT_READDIR:
			policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
			break;
		case IT_LAYOUT:
			policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
			break;
		default:
			policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
			break;
		}

		mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
				      LDLM_IBITS, &policy,
				      LCK_CR | LCK_CW | LCK_PR | LCK_PW,
				      &lockh);
	}

	if (mode) {
		it->it_lock_handle = lockh.cookie;
		it->it_lock_mode = mode;
	} else {
		it->it_lock_handle = 0;
		it->it_lock_mode = 0;
	}

	return !!mode;
}

/*
 * This long block is all about fixing up the lock and request state
 * so that it is correct as of the moment _before_ the operation was
 * applied; that way, the VFS will think that everything is normal and
 * call Lustre's regular VFS methods.
 *
 * If we're performing a creation, that means that unless the creation
 * failed with EEXIST, we should fake up a negative dentry.
 *
 * For everything else, we want the lookup to succeed.
 *
 * One additional note: if CREATE or OPEN succeeded, we add an extra
 * reference to the request because we need to keep it around until
 * ll_create/ll_open gets called.
 *
 * The server will return to us, in it_disposition, an indication of
 * exactly what it_status refers to.
 *
 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
 * otherwise if DISP_OPEN_CREATE is set, then it_status is the
 * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
 * was successful.
 *
 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
 * child lookup.
 */
int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
		    struct lookup_intent *it, struct ptlrpc_request **reqp,
		    ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
{
	struct ldlm_enqueue_info einfo = {
		.ei_type	= LDLM_IBITS,
		.ei_mode	= it_to_lock_mode(it),
		.ei_cb_bl	= cb_blocking,
		.ei_cb_cp	= ldlm_completion_ast,
	};
	struct lustre_handle lockh;
	int rc = 0;

	LASSERT(it);

	CDEBUG(D_DLMTRACE, "(name: %.*s," DFID ") in obj " DFID
		", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
		op_data->op_name, PFID(&op_data->op_fid2),
		PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
		it->it_flags);

	lockh.cookie = 0;
	if (fid_is_sane(&op_data->op_fid2) &&
	    (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
		/* We could just return 1 immediately, but since we should only
		 * be called in revalidate_it if we already have a lock, let's
		 * verify that.
		 */
		it->it_lock_handle = 0;
		rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
		/* Only return failure if it was not GETATTR by cfid
		 * (from inode_revalidate)
		 */
		if (rc || op_data->op_namelen != 0)
			return rc;
	}

	/* For case if upper layer did not alloc fid, do it now. */
	if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
		rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
		if (rc < 0) {
			CERROR("Can't alloc new fid, rc %d\n", rc);
			return rc;
		}
	}
	rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
			 extra_lock_flags);
	if (rc < 0)
		return rc;

	*reqp = it->it_request;
	rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
	return rc;
}

static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
					      struct ptlrpc_request *req,
					      void *args, int rc)
{
	struct mdc_getattr_args  *ga = args;
	struct obd_export	*exp = ga->ga_exp;
	struct md_enqueue_info   *minfo = ga->ga_minfo;
	struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
	struct lookup_intent     *it;
	struct lustre_handle     *lockh;
	struct obd_device	*obddev;
	struct ldlm_reply	 *lockrep;
	__u64		     flags = LDLM_FL_HAS_INTENT;

	it    = &minfo->mi_it;
	lockh = &minfo->mi_lockh;

	obddev = class_exp2obd(exp);

	obd_put_request_slot(&obddev->u.cli);
	if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
		rc = -ETIMEDOUT;

	rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
				   &flags, NULL, 0, lockh, rc);
	if (rc < 0) {
		CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
		mdc_clear_replay_flag(req, rc);
		goto out;
	}

	lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);

	lockrep->lock_policy_res2 =
		ptlrpc_status_ntoh(lockrep->lock_policy_res2);

	rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
	if (rc)
		goto out;

	rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);

out:
	minfo->mi_cb(req, minfo, rc);
	return 0;
}

int mdc_intent_getattr_async(struct obd_export *exp,
			     struct md_enqueue_info *minfo)
{
	struct md_op_data       *op_data = &minfo->mi_data;
	struct lookup_intent    *it = &minfo->mi_it;
	struct ptlrpc_request   *req;
	struct mdc_getattr_args *ga;
	struct obd_device       *obddev = class_exp2obd(exp);
	struct ldlm_res_id       res_id;
	union ldlm_policy_data policy = {
		.l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
	};
	int		      rc = 0;
	__u64		    flags = LDLM_FL_HAS_INTENT;

	CDEBUG(D_DLMTRACE,
	       "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
	       (int)op_data->op_namelen, op_data->op_name,
	       PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);

	fid_build_reg_res_name(&op_data->op_fid1, &res_id);
	req = mdc_intent_getattr_pack(exp, it, op_data);
	if (IS_ERR(req))
		return PTR_ERR(req);

	rc = obd_get_request_slot(&obddev->u.cli);
	if (rc != 0) {
		ptlrpc_req_finished(req);
		return rc;
	}

	rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
			      &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
	if (rc < 0) {
		obd_put_request_slot(&obddev->u.cli);
		ptlrpc_req_finished(req);
		return rc;
	}

	BUILD_BUG_ON(sizeof(*ga) > sizeof(req->rq_async_args));
	ga = ptlrpc_req_async_args(req);
	ga->ga_exp = exp;
	ga->ga_minfo = minfo;

	req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
	ptlrpcd_add_req(req);

	return 0;
}