#include <linux/module.h>
#include <linux/slab.h>
#include <linux/drbd.h>
#include "drbd_int.h"
#include "drbd_req.h"
static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size);
static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src)
{
struct drbd_request *req;
req = mempool_alloc(&drbd_request_mempool, GFP_NOIO);
if (!req)
return NULL;
memset(req, 0, sizeof(*req));
req->rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
| (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
| (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
req->device = device;
req->master_bio = bio_src;
req->epoch = 0;
drbd_clear_interval(&req->i);
req->i.sector = bio_src->bi_iter.bi_sector;
req->i.size = bio_src->bi_iter.bi_size;
req->i.local = true;
req->i.waiting = false;
INIT_LIST_HEAD(&req->tl_requests);
INIT_LIST_HEAD(&req->w.list);
INIT_LIST_HEAD(&req->req_pending_master_completion);
INIT_LIST_HEAD(&req->req_pending_local);
atomic_set(&req->completion_ref, 1);
kref_init(&req->kref);
return req;
}
static void drbd_remove_request_interval(struct rb_root *root,
struct drbd_request *req)
{
struct drbd_device *device = req->device;
struct drbd_interval *i = &req->i;
drbd_remove_interval(root, i);
if (i->waiting)
wake_up(&device->misc_wait);
}
void drbd_req_destroy(struct kref *kref)
{
struct drbd_request *req = container_of(kref, struct drbd_request, kref);
struct drbd_device *device = req->device;
const unsigned s = req->rq_state;
if ((req->master_bio && !(s & RQ_POSTPONED)) ||
atomic_read(&req->completion_ref) ||
(s & RQ_LOCAL_PENDING) ||
((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
drbd_err(device, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
s, atomic_read(&req->completion_ref));
return;
}
list_del_init(&req->tl_requests);
if (!drbd_interval_empty(&req->i)) {
struct rb_root *root;
if (s & RQ_WRITE)
root = &device->write_requests;
else
root = &device->read_requests;
drbd_remove_request_interval(root, req);
} else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
s, (unsigned long long)req->i.sector, req->i.size);
if (s & RQ_WRITE) {
struct drbd_peer_device *peer_device = first_peer_device(device);
if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
drbd_set_in_sync(peer_device, req->i.sector, req->i.size);
}
if (s & RQ_IN_ACT_LOG) {
if (get_ldev_if_state(device, D_FAILED)) {
drbd_al_complete_io(device, &req->i);
put_ldev(device);
} else if (drbd_ratelimit()) {
drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), "
"but my Disk seems to have failed :(\n",
(unsigned long long) req->i.sector, req->i.size);
}
}
}
mempool_free(req, &drbd_request_mempool);
}
static void wake_all_senders(struct drbd_connection *connection)
{
wake_up(&connection->sender_work.q_wait);
}
void start_new_tl_epoch(struct drbd_connection *connection)
{
if (connection->current_tle_writes == 0)
return;
connection->current_tle_writes = 0;
atomic_inc(&connection->current_tle_nr);
wake_all_senders(connection);
}
void complete_master_bio(struct drbd_device *device,
struct bio_and_error *m)
{
if (unlikely(m->error))
m->bio->bi_status = errno_to_blk_status(m->error);
bio_endio(m->bio);
dec_ap_bio(device);
}
static
void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
{
const unsigned s = req->rq_state;
struct drbd_device *device = req->device;
int error, ok;
if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
(s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
(s & RQ_COMPLETION_SUSP)) {
drbd_err(device, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
return;
}
if (!req->master_bio) {
drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
return;
}
ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
error = PTR_ERR(req->private_bio);
if (op_is_write(bio_op(req->master_bio)) &&
req->epoch == atomic_read(&first_peer_device(device)->connection->current_tle_nr))
start_new_tl_epoch(first_peer_device(device)->connection);
bio_end_io_acct(req->master_bio, req->start_jif);
if (!ok &&
bio_op(req->master_bio) == REQ_OP_READ &&
!(req->master_bio->bi_opf & REQ_RAHEAD) &&
!list_empty(&req->tl_requests))
req->rq_state |= RQ_POSTPONED;
if (!(req->rq_state & RQ_POSTPONED)) {
m->error = ok ? 0 : (error ?: -EIO);
m->bio = req->master_bio;
req->master_bio = NULL;
req->i.completed = true;
}
if (req->i.waiting)
wake_up(&device->misc_wait);
list_del_init(&req->req_pending_master_completion);
}
static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
{
struct drbd_device *device = req->device;
D_ASSERT(device, m || (req->rq_state & RQ_POSTPONED));
if (!put)
return;
if (!atomic_sub_and_test(put, &req->completion_ref))
return;
drbd_req_complete(req, m);
if (req->rq_state & RQ_LOCAL_ABORTED)
return;
if (req->rq_state & RQ_POSTPONED) {
drbd_restart_request(req);
return;
}
kref_put(&req->kref, drbd_req_destroy);
}
static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
if (!connection)
return;
if (connection->req_next == NULL)
connection->req_next = req;
}
static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
struct drbd_request *iter = req;
if (!connection)
return;
if (connection->req_next != req)
return;
req = NULL;
list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
const unsigned int s = iter->rq_state;
if (s & RQ_NET_QUEUED) {
req = iter;
break;
}
}
connection->req_next = req;
}
static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
if (!connection)
return;
if (connection->req_ack_pending == NULL)
connection->req_ack_pending = req;
}
static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
struct drbd_request *iter = req;
if (!connection)
return;
if (connection->req_ack_pending != req)
return;
req = NULL;
list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
const unsigned int s = iter->rq_state;
if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) {
req = iter;
break;
}
}
connection->req_ack_pending = req;
}
static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
if (!connection)
return;
if (connection->req_not_net_done == NULL)
connection->req_not_net_done = req;
}
static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
struct drbd_request *iter = req;
if (!connection)
return;
if (connection->req_not_net_done != req)
return;
req = NULL;
list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
const unsigned int s = iter->rq_state;
if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) {
req = iter;
break;
}
}
connection->req_not_net_done = req;
}
static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
int clear, int set)
{
struct drbd_device *device = req->device;
struct drbd_peer_device *peer_device = first_peer_device(device);
unsigned s = req->rq_state;
int c_put = 0;
if (drbd_suspended(device) && !((s | clear) & RQ_COMPLETION_SUSP))
set |= RQ_COMPLETION_SUSP;
req->rq_state &= ~clear;
req->rq_state |= set;
if (req->rq_state == s)
return;
kref_get(&req->kref);
if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
atomic_inc(&req->completion_ref);
if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
inc_ap_pending(device);
atomic_inc(&req->completion_ref);
}
if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
atomic_inc(&req->completion_ref);
set_if_null_req_next(peer_device, req);
}
if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
kref_get(&req->kref);
if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
if (!(s & RQ_NET_DONE)) {
atomic_add(req->i.size >> 9, &device->ap_in_flight);
set_if_null_req_not_net_done(peer_device, req);
}
if (req->rq_state & RQ_NET_PENDING)
set_if_null_req_ack_pending(peer_device, req);
}
if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
atomic_inc(&req->completion_ref);
if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
++c_put;
if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
D_ASSERT(device, req->rq_state & RQ_LOCAL_PENDING);
++c_put;
}
if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
if (req->rq_state & RQ_LOCAL_ABORTED)
kref_put(&req->kref, drbd_req_destroy);
else
++c_put;
list_del_init(&req->req_pending_local);
}
if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
dec_ap_pending(device);
++c_put;
req->acked_jif = jiffies;
advance_conn_req_ack_pending(peer_device, req);
}
if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
++c_put;
advance_conn_req_next(peer_device, req);
}
if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
if (s & RQ_NET_SENT)
atomic_sub(req->i.size >> 9, &device->ap_in_flight);
if (s & RQ_EXP_BARR_ACK)
kref_put(&req->kref, drbd_req_destroy);
req->net_done_jif = jiffies;
advance_conn_req_next(peer_device, req);
advance_conn_req_ack_pending(peer_device, req);
advance_conn_req_not_net_done(peer_device, req);
}
if (req->i.waiting)
wake_up(&device->misc_wait);
drbd_req_put_completion_ref(req, m, c_put);
kref_put(&req->kref, drbd_req_destroy);
}
static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req)
{
if (!drbd_ratelimit())
return;
drbd_warn(device, "local %s IO error sector %llu+%u on %pg\n",
(req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
(unsigned long long)req->i.sector,
req->i.size >> 9,
device->ldev->backing_bdev);
}
static inline bool is_pending_write_protocol_A(struct drbd_request *req)
{
return (req->rq_state &
(RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
== (RQ_WRITE|RQ_NET_PENDING);
}
int __req_mod(struct drbd_request *req, enum drbd_req_event what,
struct drbd_peer_device *peer_device,
struct bio_and_error *m)
{
struct drbd_device *const device = req->device;
struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
struct net_conf *nc;
int p, rv = 0;
if (m)
m->bio = NULL;
switch (what) {
default:
drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
break;
case TO_BE_SENT:
D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
rcu_read_lock();
nc = rcu_dereference(connection->net_conf);
p = nc->wire_protocol;
rcu_read_unlock();
req->rq_state |=
p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
mod_rq_state(req, m, 0, RQ_NET_PENDING);
break;
case TO_BE_SUBMITTED:
D_ASSERT(device, !(req->rq_state & RQ_LOCAL_MASK));
mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
break;
case COMPLETED_OK:
if (req->rq_state & RQ_WRITE)
device->writ_cnt += req->i.size >> 9;
else
device->read_cnt += req->i.size >> 9;
mod_rq_state(req, m, RQ_LOCAL_PENDING,
RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
break;
case ABORT_DISK_IO:
mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
break;
case WRITE_COMPLETED_WITH_ERROR:
drbd_report_io_error(device, req);
__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
break;
case READ_COMPLETED_WITH_ERROR:
drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
drbd_report_io_error(device, req);
__drbd_chk_io_error(device, DRBD_READ_ERROR);
fallthrough;
case READ_AHEAD_COMPLETED_WITH_ERROR:
mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
break;
case DISCARD_COMPLETED_NOTSUPP:
case DISCARD_COMPLETED_WITH_ERROR:
mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
break;
case QUEUE_FOR_NET_READ:
D_ASSERT(device, drbd_interval_empty(&req->i));
drbd_insert_interval(&device->read_requests, &req->i);
set_bit(UNPLUG_REMOTE, &device->flags);
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
req->w.cb = w_send_read_req;
drbd_queue_work(&connection->sender_work,
&req->w);
break;
case QUEUE_FOR_NET_WRITE:
D_ASSERT(device, drbd_interval_empty(&req->i));
drbd_insert_interval(&device->write_requests, &req->i);
set_bit(UNPLUG_REMOTE, &device->flags);
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
req->w.cb = w_send_dblock;
drbd_queue_work(&connection->sender_work,
&req->w);
rcu_read_lock();
nc = rcu_dereference(connection->net_conf);
p = nc->max_epoch_size;
rcu_read_unlock();
if (connection->current_tle_writes >= p)
start_new_tl_epoch(connection);
break;
case QUEUE_FOR_SEND_OOS:
mod_rq_state(req, m, 0, RQ_NET_QUEUED);
req->w.cb = w_send_out_of_sync;
drbd_queue_work(&connection->sender_work,
&req->w);
break;
case READ_RETRY_REMOTE_CANCELED:
case SEND_CANCELED:
case SEND_FAILED:
mod_rq_state(req, m, RQ_NET_QUEUED, 0);
break;
case HANDED_OVER_TO_NETWORK:
if (is_pending_write_protocol_A(req))
mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
RQ_NET_SENT|RQ_NET_OK);
else
mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
break;
case OOS_HANDED_TO_NETWORK:
mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
break;
case CONNECTION_LOST_WHILE_PENDING:
mod_rq_state(req, m,
RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
RQ_NET_DONE);
break;
case CONFLICT_RESOLVED:
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
break;
case WRITE_ACKED_BY_PEER_AND_SIS:
req->rq_state |= RQ_NET_SIS;
fallthrough;
case WRITE_ACKED_BY_PEER:
goto ack_common;
case RECV_ACKED_BY_PEER:
D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
ack_common:
mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
break;
case POSTPONE_WRITE:
D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
req->rq_state |= RQ_POSTPONED;
if (req->i.waiting)
wake_up(&device->misc_wait);
break;
case NEG_ACKED:
mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
break;
case FAIL_FROZEN_DISK_IO:
if (!(req->rq_state & RQ_LOCAL_COMPLETED))
break;
mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
break;
case RESTART_FROZEN_DISK_IO:
if (!(req->rq_state & RQ_LOCAL_COMPLETED))
break;
mod_rq_state(req, m,
RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
RQ_LOCAL_PENDING);
rv = MR_READ;
if (bio_data_dir(req->master_bio) == WRITE)
rv = MR_WRITE;
get_ldev(device);
req->w.cb = w_restart_disk_io;
drbd_queue_work(&connection->sender_work,
&req->w);
break;
case RESEND:
if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
break;
}
if (!(req->rq_state & RQ_NET_OK)) {
mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
if (req->w.cb) {
drbd_queue_work(&connection->sender_work,
&req->w);
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
}
break;
}
fallthrough;
case BARRIER_ACKED:
if (!(req->rq_state & RQ_WRITE))
break;
if (req->rq_state & RQ_NET_PENDING) {
drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n");
}
mod_rq_state(req, m, RQ_COMPLETION_SUSP,
(req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
break;
case DATA_RECEIVED:
D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
break;
case QUEUE_AS_DRBD_BARRIER:
start_new_tl_epoch(connection);
mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
break;
}
return rv;
}
static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size)
{
unsigned long sbnr, ebnr;
sector_t esector, nr_sectors;
if (device->state.disk == D_UP_TO_DATE)
return true;
if (device->state.disk != D_INCONSISTENT)
return false;
esector = sector + (size >> 9) - 1;
nr_sectors = get_capacity(device->vdisk);
D_ASSERT(device, sector < nr_sectors);
D_ASSERT(device, esector < nr_sectors);
sbnr = BM_SECT_TO_BIT(sector);
ebnr = BM_SECT_TO_BIT(esector);
return drbd_bm_count_bits(device, sbnr, ebnr) == 0;
}
static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t sector,
enum drbd_read_balancing rbm)
{
int stripe_shift;
switch (rbm) {
case RB_CONGESTED_REMOTE:
return false;
case RB_LEAST_PENDING:
return atomic_read(&device->local_cnt) >
atomic_read(&device->ap_pending_cnt) + atomic_read(&device->rs_pending_cnt);
case RB_32K_STRIPING:
case RB_64K_STRIPING:
case RB_128K_STRIPING:
case RB_256K_STRIPING:
case RB_512K_STRIPING:
case RB_1M_STRIPING:
stripe_shift = (rbm - RB_32K_STRIPING + 15);
return (sector >> (stripe_shift - 9)) & 1;
case RB_ROUND_ROBIN:
return test_and_change_bit(READ_BALANCE_RR, &device->flags);
case RB_PREFER_REMOTE:
return true;
case RB_PREFER_LOCAL:
default:
return false;
}
}
static void complete_conflicting_writes(struct drbd_request *req)
{
DEFINE_WAIT(wait);
struct drbd_device *device = req->device;
struct drbd_interval *i;
sector_t sector = req->i.sector;
int size = req->i.size;
for (;;) {
drbd_for_each_overlap(i, &device->write_requests, sector, size) {
if (i->completed)
continue;
break;
}
if (!i)
break;
prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
i->waiting = true;
spin_unlock_irq(&device->resource->req_lock);
schedule();
spin_lock_irq(&device->resource->req_lock);
}
finish_wait(&device->misc_wait, &wait);
}
static void maybe_pull_ahead(struct drbd_device *device)
{
struct drbd_connection *connection = first_peer_device(device)->connection;
struct net_conf *nc;
bool congested = false;
enum drbd_on_congestion on_congestion;
rcu_read_lock();
nc = rcu_dereference(connection->net_conf);
on_congestion = nc ? nc->on_congestion : OC_BLOCK;
rcu_read_unlock();
if (on_congestion == OC_BLOCK ||
connection->agreed_pro_version < 96)
return;
if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
return;
if (!get_ldev_if_state(device, D_UP_TO_DATE))
return;
if (nc->cong_fill &&
atomic_read(&device->ap_in_flight) >= nc->cong_fill) {
drbd_info(device, "Congestion-fill threshold reached\n");
congested = true;
}
if (device->act_log->used >= nc->cong_extents) {
drbd_info(device, "Congestion-extents threshold reached\n");
congested = true;
}
if (congested) {
start_new_tl_epoch(first_peer_device(device)->connection);
if (on_congestion == OC_PULL_AHEAD)
_drbd_set_state(_NS(device, conn, C_AHEAD), 0, NULL);
else
_drbd_set_state(_NS(device, conn, C_DISCONNECTING), 0, NULL);
}
put_ldev(device);
}
static bool do_remote_read(struct drbd_request *req)
{
struct drbd_device *device = req->device;
enum drbd_read_balancing rbm;
if (req->private_bio) {
if (!drbd_may_do_local_read(device,
req->i.sector, req->i.size)) {
bio_put(req->private_bio);
req->private_bio = NULL;
put_ldev(device);
}
}
if (device->state.pdsk != D_UP_TO_DATE)
return false;
if (req->private_bio == NULL)
return true;
rcu_read_lock();
rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
rcu_read_unlock();
if (rbm == RB_PREFER_LOCAL && req->private_bio)
return false;
if (remote_due_to_read_balancing(device, req->i.sector, rbm)) {
if (req->private_bio) {
bio_put(req->private_bio);
req->private_bio = NULL;
put_ldev(device);
}
return true;
}
return false;
}
bool drbd_should_do_remote(union drbd_dev_state s)
{
return s.pdsk == D_UP_TO_DATE ||
(s.pdsk >= D_INCONSISTENT &&
s.conn >= C_WF_BITMAP_T &&
s.conn < C_AHEAD);
}
static bool drbd_should_send_out_of_sync(union drbd_dev_state s)
{
return s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S;
}
static int drbd_process_write_request(struct drbd_request *req)
{
struct drbd_device *device = req->device;
struct drbd_peer_device *peer_device = first_peer_device(device);
int remote, send_oos;
remote = drbd_should_do_remote(device->state);
send_oos = drbd_should_send_out_of_sync(device->state);
if (unlikely(req->i.size == 0)) {
D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
if (remote)
_req_mod(req, QUEUE_AS_DRBD_BARRIER, peer_device);
return remote;
}
if (!remote && !send_oos)
return 0;
D_ASSERT(device, !(remote && send_oos));
if (remote) {
_req_mod(req, TO_BE_SENT, peer_device);
_req_mod(req, QUEUE_FOR_NET_WRITE, peer_device);
} else if (drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size))
_req_mod(req, QUEUE_FOR_SEND_OOS, peer_device);
return remote;
}
static void drbd_process_discard_or_zeroes_req(struct drbd_request *req, int flags)
{
int err = drbd_issue_discard_or_zero_out(req->device,
req->i.sector, req->i.size >> 9, flags);
if (err)
req->private_bio->bi_status = BLK_STS_IOERR;
bio_endio(req->private_bio);
}
static void
drbd_submit_req_private_bio(struct drbd_request *req)
{
struct drbd_device *device = req->device;
struct bio *bio = req->private_bio;
unsigned int type;
if (bio_op(bio) != REQ_OP_READ)
type = DRBD_FAULT_DT_WR;
else if (bio->bi_opf & REQ_RAHEAD)
type = DRBD_FAULT_DT_RA;
else
type = DRBD_FAULT_DT_RD;
if (get_ldev(device)) {
if (drbd_insert_fault(device, type))
bio_io_error(bio);
else if (bio_op(bio) == REQ_OP_WRITE_ZEROES)
drbd_process_discard_or_zeroes_req(req, EE_ZEROOUT |
((bio->bi_opf & REQ_NOUNMAP) ? 0 : EE_TRIM));
else if (bio_op(bio) == REQ_OP_DISCARD)
drbd_process_discard_or_zeroes_req(req, EE_TRIM);
else
submit_bio_noacct(bio);
put_ldev(device);
} else
bio_io_error(bio);
}
static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
{
spin_lock_irq(&device->resource->req_lock);
list_add_tail(&req->tl_requests, &device->submit.writes);
list_add_tail(&req->req_pending_master_completion,
&device->pending_master_completion[1 ]);
spin_unlock_irq(&device->resource->req_lock);
queue_work(device->submit.wq, &device->submit.worker);
wake_up(&device->al_wait);
}
static struct drbd_request *
drbd_request_prepare(struct drbd_device *device, struct bio *bio)
{
const int rw = bio_data_dir(bio);
struct drbd_request *req;
req = drbd_req_new(device, bio);
if (!req) {
dec_ap_bio(device);
drbd_err(device, "could not kmalloc() req\n");
bio->bi_status = BLK_STS_RESOURCE;
bio_endio(bio);
return ERR_PTR(-ENOMEM);
}
req->start_jif = bio_start_io_acct(req->master_bio);
if (get_ldev(device)) {
req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
bio, GFP_NOIO,
&drbd_io_bio_set);
req->private_bio->bi_private = req;
req->private_bio->bi_end_io = drbd_request_endio;
}
if (bio_op(bio) == REQ_OP_WRITE_ZEROES ||
bio_op(bio) == REQ_OP_DISCARD)
goto queue_for_submitter_thread;
if (rw == WRITE && req->private_bio && req->i.size
&& !test_bit(AL_SUSPENDED, &device->flags)) {
if (!drbd_al_begin_io_fastpath(device, &req->i))
goto queue_for_submitter_thread;
req->rq_state |= RQ_IN_ACT_LOG;
req->in_actlog_jif = jiffies;
}
return req;
queue_for_submitter_thread:
atomic_inc(&device->ap_actlog_cnt);
drbd_queue_write(device, req);
return NULL;
}
static bool may_do_writes(struct drbd_device *device)
{
const union drbd_dev_state s = device->state;
return s.disk == D_UP_TO_DATE || s.pdsk == D_UP_TO_DATE;
}
struct drbd_plug_cb {
struct blk_plug_cb cb;
struct drbd_request *most_recent_req;
};
static void drbd_unplug(struct blk_plug_cb *cb, bool from_schedule)
{
struct drbd_plug_cb *plug = container_of(cb, struct drbd_plug_cb, cb);
struct drbd_resource *resource = plug->cb.data;
struct drbd_request *req = plug->most_recent_req;
kfree(cb);
if (!req)
return;
spin_lock_irq(&resource->req_lock);
req->rq_state |= RQ_UNPLUG;
drbd_queue_unplug(req->device);
kref_put(&req->kref, drbd_req_destroy);
spin_unlock_irq(&resource->req_lock);
}
static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
{
struct drbd_plug_cb *plug;
struct blk_plug_cb *cb = blk_check_plugged(drbd_unplug, resource, sizeof(*plug));
if (cb)
plug = container_of(cb, struct drbd_plug_cb, cb);
else
plug = NULL;
return plug;
}
static void drbd_update_plug(struct drbd_plug_cb *plug, struct drbd_request *req)
{
struct drbd_request *tmp = plug->most_recent_req;
kref_get(&req->kref);
plug->most_recent_req = req;
if (tmp)
kref_put(&tmp->kref, drbd_req_destroy);
}
static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
{
struct drbd_resource *resource = device->resource;
struct drbd_peer_device *peer_device = first_peer_device(device);
const int rw = bio_data_dir(req->master_bio);
struct bio_and_error m = { NULL, };
bool no_remote = false;
bool submit_private_bio = false;
spin_lock_irq(&resource->req_lock);
if (rw == WRITE) {
complete_conflicting_writes(req);
maybe_pull_ahead(device);
}
if (drbd_suspended(device)) {
req->rq_state |= RQ_POSTPONED;
if (req->private_bio) {
bio_put(req->private_bio);
req->private_bio = NULL;
put_ldev(device);
}
goto out;
}
if (rw != WRITE) {
if (!do_remote_read(req) && !req->private_bio)
goto nodata;
}
req->epoch = atomic_read(&first_peer_device(device)->connection->current_tle_nr);
if (likely(req->i.size!=0)) {
if (rw == WRITE)
first_peer_device(device)->connection->current_tle_writes++;
list_add_tail(&req->tl_requests, &first_peer_device(device)->connection->transfer_log);
}
if (rw == WRITE) {
if (req->private_bio && !may_do_writes(device)) {
bio_put(req->private_bio);
req->private_bio = NULL;
put_ldev(device);
goto nodata;
}
if (!drbd_process_write_request(req))
no_remote = true;
} else {
if (req->private_bio == NULL) {
_req_mod(req, TO_BE_SENT, peer_device);
_req_mod(req, QUEUE_FOR_NET_READ, peer_device);
} else
no_remote = true;
}
if (no_remote == false) {
struct drbd_plug_cb *plug = drbd_check_plugged(resource);
if (plug)
drbd_update_plug(plug, req);
}
if (list_empty(&req->req_pending_master_completion))
list_add_tail(&req->req_pending_master_completion,
&device->pending_master_completion[rw == WRITE]);
if (req->private_bio) {
req->pre_submit_jif = jiffies;
list_add_tail(&req->req_pending_local,
&device->pending_completion[rw == WRITE]);
_req_mod(req, TO_BE_SUBMITTED, NULL);
submit_private_bio = true;
} else if (no_remote) {
nodata:
if (drbd_ratelimit())
drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
(unsigned long long)req->i.sector, req->i.size >> 9);
}
out:
drbd_req_put_completion_ref(req, &m, 1);
spin_unlock_irq(&resource->req_lock);
if (submit_private_bio)
drbd_submit_req_private_bio(req);
if (m.bio)
complete_master_bio(device, &m);
}
void __drbd_make_request(struct drbd_device *device, struct bio *bio)
{
struct drbd_request *req = drbd_request_prepare(device, bio);
if (IS_ERR_OR_NULL(req))
return;
drbd_send_and_submit(device, req);
}
static void submit_fast_path(struct drbd_device *device, struct list_head *incoming)
{
struct blk_plug plug;
struct drbd_request *req, *tmp;
blk_start_plug(&plug);
list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
const int rw = bio_data_dir(req->master_bio);
if (rw == WRITE
&& req->private_bio && req->i.size
&& !test_bit(AL_SUSPENDED, &device->flags)) {
if (!drbd_al_begin_io_fastpath(device, &req->i))
continue;
req->rq_state |= RQ_IN_ACT_LOG;
req->in_actlog_jif = jiffies;
atomic_dec(&device->ap_actlog_cnt);
}
list_del_init(&req->tl_requests);
drbd_send_and_submit(device, req);
}
blk_finish_plug(&plug);
}
static bool prepare_al_transaction_nonblock(struct drbd_device *device,
struct list_head *incoming,
struct list_head *pending,
struct list_head *later)
{
struct drbd_request *req;
int wake = 0;
int err;
spin_lock_irq(&device->al_lock);
while ((req = list_first_entry_or_null(incoming, struct drbd_request, tl_requests))) {
err = drbd_al_begin_io_nonblock(device, &req->i);
if (err == -ENOBUFS)
break;
if (err == -EBUSY)
wake = 1;
if (err)
list_move_tail(&req->tl_requests, later);
else
list_move_tail(&req->tl_requests, pending);
}
spin_unlock_irq(&device->al_lock);
if (wake)
wake_up(&device->al_wait);
return !list_empty(pending);
}
static void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
{
struct blk_plug plug;
struct drbd_request *req;
blk_start_plug(&plug);
while ((req = list_first_entry_or_null(pending, struct drbd_request, tl_requests))) {
req->rq_state |= RQ_IN_ACT_LOG;
req->in_actlog_jif = jiffies;
atomic_dec(&device->ap_actlog_cnt);
list_del_init(&req->tl_requests);
drbd_send_and_submit(device, req);
}
blk_finish_plug(&plug);
}
void do_submit(struct work_struct *ws)
{
struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
LIST_HEAD(incoming);
LIST_HEAD(pending);
LIST_HEAD(busy);
spin_lock_irq(&device->resource->req_lock);
list_splice_tail_init(&device->submit.writes, &incoming);
spin_unlock_irq(&device->resource->req_lock);
for (;;) {
DEFINE_WAIT(wait);
list_splice_init(&busy, &incoming);
submit_fast_path(device, &incoming);
if (list_empty(&incoming))
break;
for (;;) {
prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
list_splice_init(&busy, &incoming);
prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
if (!list_empty(&pending))
break;
schedule();
if (!list_empty(&incoming))
continue;
spin_lock_irq(&device->resource->req_lock);
list_splice_tail_init(&device->submit.writes, &incoming);
spin_unlock_irq(&device->resource->req_lock);
}
finish_wait(&device->al_wait, &wait);
while (list_empty(&incoming)) {
LIST_HEAD(more_pending);
LIST_HEAD(more_incoming);
bool made_progress;
if (list_empty(&device->submit.writes))
break;
spin_lock_irq(&device->resource->req_lock);
list_splice_tail_init(&device->submit.writes, &more_incoming);
spin_unlock_irq(&device->resource->req_lock);
if (list_empty(&more_incoming))
break;
made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
list_splice_tail_init(&more_pending, &pending);
list_splice_tail_init(&more_incoming, &incoming);
if (!made_progress)
break;
}
drbd_al_begin_io_commit(device);
send_and_submit_pending(device, &pending);
}
}
void drbd_submit_bio(struct bio *bio)
{
struct drbd_device *device = bio->bi_bdev->bd_disk->private_data;
bio = bio_split_to_limits(bio);
if (!bio)
return;
D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
inc_ap_bio(device);
__drbd_make_request(device, bio);
}
static bool net_timeout_reached(struct drbd_request *net_req,
struct drbd_connection *connection,
unsigned long now, unsigned long ent,
unsigned int ko_count, unsigned int timeout)
{
struct drbd_device *device = net_req->device;
if (!time_after(now, net_req->pre_send_jif + ent))
return false;
if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent))
return false;
if (net_req->rq_state & RQ_NET_PENDING) {
drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
return true;
}
if (net_req->epoch == connection->send.current_epoch_nr) {
drbd_warn(device,
"We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
return false;
}
if (time_after(now, connection->send.last_sent_barrier_jif + ent)) {
drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
connection->send.last_sent_barrier_jif, now,
jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout);
return true;
}
return false;
}
void request_timer_fn(struct timer_list *t)
{
struct drbd_device *device = from_timer(device, t, request_timer);
struct drbd_connection *connection = first_peer_device(device)->connection;
struct drbd_request *req_read, *req_write, *req_peer;
struct net_conf *nc;
unsigned long oldest_submit_jif;
unsigned long ent = 0, dt = 0, et, nt;
unsigned long now;
unsigned int ko_count = 0, timeout = 0;
rcu_read_lock();
nc = rcu_dereference(connection->net_conf);
if (nc && device->state.conn >= C_WF_REPORT_PARAMS) {
ko_count = nc->ko_count;
timeout = nc->timeout;
}
if (get_ldev(device)) {
dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
put_ldev(device);
}
rcu_read_unlock();
ent = timeout * HZ/10 * ko_count;
et = min_not_zero(dt, ent);
if (!et)
return;
now = jiffies;
nt = now + et;
spin_lock_irq(&device->resource->req_lock);
req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
req_peer = connection->req_ack_pending;
if (!req_peer)
req_peer = connection->req_not_net_done;
if (req_peer && req_peer->device != device)
req_peer = NULL;
if (req_peer == NULL && req_write == NULL && req_read == NULL)
goto out;
oldest_submit_jif =
(req_write && req_read)
? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
? req_write->pre_submit_jif : req_read->pre_submit_jif )
: req_write ? req_write->pre_submit_jif
: req_read ? req_read->pre_submit_jif : now;
if (ent && req_peer && net_timeout_reached(req_peer, connection, now, ent, ko_count, timeout))
_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_VERBOSE | CS_HARD);
if (dt && oldest_submit_jif != now &&
time_after(now, oldest_submit_jif + dt) &&
!time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
__drbd_chk_io_error(device, DRBD_FORCE_DETACH);
}
ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
? req_peer->pre_send_jif + ent : now + et;
dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
? oldest_submit_jif + dt : now + et;
nt = time_before(ent, dt) ? ent : dt;
out:
spin_unlock_irq(&device->resource->req_lock);
mod_timer(&device->request_timer, nt);
}