5.10. ロック(spinlock, sleeplock)

「xv6 a simple, Unix-like teaching operating system」(リンク1)の4章「Locking」と5章「Scheduling」を読むとよい。 xv6にはスピンロックスリープロックの2種類のロックがあり、それぞれacquire関数とacquiresleep関数で取ることができる。 前者はロックが取れるまでプロセッサを占有するので短時間のロックに使用し、後者はロックが取れるまでプロセッサを明け渡すので長時間のロックに使用する。
ロックの解放にはrelease関数releasesleep関数をそれぞれ使用する。

スピンロック

このロックはロックが取れるまでプロセッサを明け渡さない。
spinlock構造体を使用する。 主に使用するのはlockedフィールドで、1のときロック済みであることを示す。 cpuフィールドやpcsフィールドはデバッグに使用する。

spinlock.h

struct spinlock {
  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};

acquire関数

この関数は、割り込みを無効にし、他のプロセッサがロックを解放するまで待ってからロックを取得する。 ロック取得後にspinlock構造体に自分のcpu構造体と、最大10個分のコールスタックを記録する。

まず現在のプロセッサで割り込みを禁止する。 もしもここで割り込みを禁止しなかった場合、あるロックを取得し、直後に割り込みが生じて同じロックを取得しようとした場合に割り込み側がロックが解放されるまで待つことになるが、ロックを持っている処理は進まないのでデッドロックとなる。
現在のプロセッサが既にそのロックを取得している場合はpanicする。
spinlock構造体のlockedフィールドを1にできるまで(ロックを取得できるまで)ループする。 ここがスピンロックの肝であり、xchg関数でlockedフィールドの独立性を保つ。
ロック取得前後の命令の実行順序が入れ替わらないように__sync_synchronize関数を呼び出す。 この関数はGCCのビルトイン関数で、「Using the GNU Compiler Collection (GCC)」 (リンク3)の「6.51 Legacy __sync Built-in Functions for Atomic Memory Access」によると、フルメモリバリアを行う関数である。 手元でコードを書きアセンブリ出力してみると、mfence命令にコンパイルされる。 mfence命令は、「Intel 64 and IA-32 architectures software developer's manual combined volumes: 1, 2A, 2B, 2C, 2D, 3A, 3B, 3C, 3D, and 4」(リンク8)によると、この命令の前にあるメモリを読み書きする命令は必ずこの命令よりも前に実行され、この命令の後にあるメモリを読み書きする命令は必ずこの命令よりも後に実行されることを保証する。 これはプロセッサのOut-Of-Order実行を抑止するという点で、コンパイラレベルの最適化を抑止するvolatile修飾子とは異なる。 Out-Of-Order実行やプロセッサの仕組みについては「プロセッサを支える技術」(書籍4)がとても詳しくて分かりやすい。
acquire関数は最後に、デバッグのための情報をspinlock構造体の中に残す。

spinlock.c

void
acquire(struct spinlock *lk)
{
  pushcli(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // The xchg is atomic.
  while(xchg(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen after the lock is acquired.
  __sync_synchronize();

  // Record info about lock acquisition for debugging.
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}

pushcli関数とpopcli関数

割り込みの禁止は呼び出される関数によりネストされるので、これらの関数で管理する。

pushcli関数は現在のcpuの割り込みを禁止する。 この関数が実行される度にcpu構造体のncliフィールドがインクリメントされる。 関数呼び出し前に既に割り込み禁止としている場合があるため、ncliが0のときだけeflagsレジスタのInterrupt enableビット(9bit)を、cpu構造体のintenaフィールドに保存する。

popcli数はpushcli関数とは反対に、ncliフィールドをデクリメントする。 ncliフィールドとintenaフィールドに従って割り込みを有効化する。

spinlock.c

void
pushcli(void)
{
  int eflags;

  eflags = readeflags();
  cli();
  if(mycpu()->ncli == 0)
    mycpu()->intena = eflags & FL_IF;
  mycpu()->ncli += 1;
}

void
popcli(void)
{
  if(readeflags()&FL_IF)
    panic("popcli - interruptible");
  if(--mycpu()->ncli < 0)
    panic("popcli");
  if(mycpu()->ncli == 0 && mycpu()->intena)
    sti();
}

holding関数

この関数は引数のspinlock構造体のロックを現在のプロセッサが取っている場合に1を返す。

spinlock.c

int
holding(struct spinlock *lock)
{
  int r;
  pushcli();
  r = lock->locked && lock->cpu == mycpu();
  popcli();
  return r;
}

xchg関数

この関数はaddrにnewvalを代入し、元々入っていた値を呼び出し元に返す。 このとき、必ず単一のプロセッサのみが値の代入に成功する。

lock接頭辞とxchgl命令により、単一のプロセッサのみが値を変更できることを保証している。 命令的には値を代入するというより、交換する。 「Intel 64 and IA-32 architectures software developer's manual combined volumes: 1, 2A, 2B, 2C, 2D, 3A, 3B, 3C, 3D, and 4」(リンク8)によると、lock接頭辞は特定の命令あるいは出力がメモリである命令の場合に使用することができ、プロセッサのロッキングプロトコルを働かせることができる。 xchgl命令はlock接頭辞と組み合わせることが可能で、第一オペランドと第二オペランドの内容を交換する。

この関数の拡張インラインアセンブリの、入出力と破壊対象は次のようになっている。

出力: "+m" (*addr)"=a"(result)
入力: "1"(newval)
破壊対象: "cc"

使用されている各制約は「Using the GNU Compiler Collection (GCC)」(リンク3)に記載されている。 "cc" は「6.44.2 Extended Asm - Assembler Instructions with C Expression Operands」に記載されており、他の制約は「6.44.3 Constraints for asm Operands」の各項に記載されている。
出力について、 "+" は読み書きに使用されることを示し、 "m" はメモリのアドレスであることを示す。 また、 "=" は書き込みのみに使用されることを示し、 "a" は x86ではeaxレジスタを示す。
入力について、 "1" は %1 と同じものを指すので、eaxとなる。
破壊対象について、 "cc" はフラグレジスタを示す。 しかし、Intel SDM(リンク8)のlock接頭辞とxchg命令のFlags AffectedにはどちらもNoneが記載されているため、なぜこれが必要なのかはわからない。

xchg関数の動きをまとめると、入力としてnewvalをeaxに入れ、xchgl addr, %eaxとなり、addrとeaxの中身が交換され、eaxにはaddrに元々入っていた値が入る。 出力としてeaxをresultと紐づけているので、resultにはaddrに元々入っていた値が入る。

acquire関数では、この関数からの戻り値をwhileループの条件に使用している。 戻り値が1の場合は他のプロセッサがロックを取っていることになり、0の場合は誰もロックを取っていないということになる。

x86.h

static inline uint
xchg(volatile uint *addr, uint newval)
{
  uint result;

  // The + in "+m" denotes a read-modify-write operand.
  asm volatile("lock; xchgl %0, %1" :
               "+m" (*addr), "=a" (result) :
               "1" (newval) :
               "cc");
  return result;
}

getcallerpcs関数

この関数は引数pcs配列にコールスタックを作成する。

動作は関数の呼び出し規約が分かると分かりやすい。 呼び出し規約については「コンピュータの構成と設計 第4版」(書籍5)に説明があり、GCCにおける呼び出し規約は「Guide: Function Calling Conventions」(リンク22)にまとめられている。
この関数はまず変数ebpに現在のスタックフレームのベースアドレスを取得する。 スタックには引数pcs、引数v、リターンアドレス、ベースアドレスの順で積まれており、ひとつ16バイトで低いアドレスへ伸びていくので、引数vの積まれているアドレスから、-32バイトするとベースアドレスを得ることができる。
次にspinlock構造体の配列pcsは要素数が10なので、10個分までのスタックフレームのベースアドレスを入れるため、for文で10回ループする。 ベースアドレスをpcs配列に加え、リターンアドレスを変数ebpに代入し、次のループでebp[1]から前のスタックフレームのベースアドレスを取得し、pcs配列に加え、再びリターンアドレスをebpに代入していく。 ベースアドレスが0か、カーネル空間外を指している場合にループを抜ける。
スタックフレームが10個に満たなかった場合、2つめのfor文でpcs配列の残りのエントリに0を代入する。

spinlock.c

void
getcallerpcs(void *v, uint pcs[])
{
  uint *ebp;
  int i;

  ebp = (uint*)v - 2;
  for(i = 0; i < 10; i++){
    if(ebp == 0 || ebp < (uint*)KERNBASE || ebp == (uint*)0xffffffff)
      break;
    pcs[i] = ebp[1];     // saved %eip
    ebp = (uint*)ebp[0]; // saved %ebp
  }
  for(; i < 10; i++)
    pcs[i] = 0;
}

release関数

この関数はspinlock構造体のロックを解放する。

基本的にacquire関数の動作を反対から行う。 movl命令を用いたロック解放部分以外は上で確認した通り。

spinlock構造体のコールスタックとcpu構造体を0でリセットした後、__sync_synchronize関数でロック解放前と解放後の命令順序を保証する。
ロックの解放は、既に現在のプロセッサが当該ロックを取得していることがholding関数により保証されているため、xchg命令ではなく、move命令でlockedフィールドに0を代入する。 c言語で lk-\>locked = 0 とぜず、拡張インラインアセンブリを使用する理由は、「xv6 a simple, Unix-like teaching operating system」(リンク1)の4章「Code: Locks」で説明されており、c言語の仕様では特段単一の代入命令がatomicに行われることを明記していないため。
ロック解放後、popcli関数で必要に応じて割り込みを可能にする。

spinlock.c

void
release(struct spinlock *lk)
{
  if(!holding(lk))
    panic("release");

  lk->pcs[0] = 0;
  lk->cpu = 0;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other cores before the lock is released.
  // Both the C compiler and the hardware may re-order loads and
  // stores; __sync_synchronize() tells them both not to.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code can't use a C assignment, since it might
  // not be atomic. A real OS would use C atomics here.
  asm volatile("movl $0, %0" : "+m" (lk->locked) : );

  popcli();
}

スリープロック

このロックはロックが取れるまでプロセスをスリープ状態にしてプロセッサを明け渡す。 ディスクにアクセスする可能性がある場合等、長時間ロックを取得する可能性がある場合に使用する。

sleeplock構造体を使用する。 lockedフィールドでロックの状態を管理するが、独立性はspinlock構造体を使用して確保する。 また、後者はプロセスを起床させる際のチャネルとしても使用する。

スリープロックの流れは次の通り。
まだ誰も取得していないスリープロックを取得するとき:
lockedフィールドを1にし、pidフィールドに自分のpidを代入する。 ロック使用後は、lockedとpidを0にリセットし、同ロックを必要としている他のプロセスの状態を実行可能状態にする。

既に誰かが取得しているスリープロックを取得するとき:
自分(プロセス)のチャネルに取得したいスリープロックをセットし、スリープ状態になる。 スリープすると、コンテキストスイッチを行いスケジューラスレッドにプロセッサを明け渡す。
スリープロックが解放されると、それを取っていたプロセスがチャネルを通してスリープ状態のプロセス(自分)を実行可能状態にする。
あとはスケジューラスレッドが自分を実行してくれるまで待つ。
もしも他のプロセスも同じロックを取るためにスリープしている場合、スケジューラがそちらを先に実行してしまうと自分は再びスリープ状態となり、ロック解放まで再度待つことになる。

sleeplock.h

struct sleeplock {
  uint locked;       // Is the lock held?
  struct spinlock lk; // spinlock protecting this sleep lock
  
  // For debugging:
  char *name;        // Name of lock.
  int pid;           // Process holding lock
};

acquiresleep関数

この関数はスリープロックを取る。 ロックが取れるまで自身をスリープ状態とする。

同じスリープロックの取得が同時に行われないように、まずsleeplock構造体のlkフィールド(spinlock構造体)のロックを取る。
スリープする場合にはsleeplock構造体のアドレスを起床する際のチャネルとして用いる。
スリープロックが取られていない場合はスリープしない。

sleeplock.c

void
acquiresleep(struct sleeplock *lk)
{
  acquire(&lk->lk);
  while (lk->locked) {
    sleep(lk, &lk->lk);
  }
  lk->locked = 1;
  lk->pid = myproc()->pid;
  release(&lk->lk);
}

sleep関数

名前の通りスリープする。

もしも引数のspinlock構造体がプロセステーブルのロックではなかった場合、プロセステーブルのロックを取り、spinlock構造体のロックを解放する。 つまりプロセステーブルのロックを取りたい。 sched関数では、scheduler関数のプロセステーブルを走査するループの中、swtch関数の後のところに復帰する。 そこではプロセステーブルのロックが取得済みの前提で動作しており、ループ終了後にそのロックを解放する。 このためsleep関数内でsched関数を呼ぶ前にプロセステーブルのロックを取得しておく必要がある。

スリープする際には第一引数chanのアドレスをチャネルに設定する。 チャネルはproc構造体のchanフィールドで、他のプロセスがプロセステーブルから特定のプロセスを見つけるために使う。

ここではスリープとはsched関数内でスケジューラスレッドにコンテキストスイッチすることであるため、sched関数呼び出し以降は起床後に実行されることとなる。

proc.c

void
sleep(void *chan, struct spinlock *lk)
{
  struct proc *p = myproc();
  
  if(p == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");

  // Must acquire ptable.lock in order to
  // change p->state and then call sched.
  // Once we hold ptable.lock, we can be
  // guaranteed that we won't miss any wakeup
  // (wakeup runs with ptable.lock locked),
  // so it's okay to release lk.
  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }
  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;

  // Reacquire original lock.
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }
}

sched関数

sched関数はスケジューラスレッドにコンテキストスイッチする。

各if文でscheduler関数のプロセステーブルのループに戻るための準備ができているかを確認している。 準備ができていないとpanicしてしまう。 必要な準備は次の通り。

  • ページテーブルのロックを取得していること
  • pushcli関数が1回以上呼ばれていること
  • 自分のステータスが実行状態でないこと
  • 割り込みが無効になっていること(eflagsレジスタのInterrupt enableビット)

準備ができていることを確認後、自分のcpu構造体からintenaフィールドの値を退避する。 これは再びコンテキストスイッチされ今のプロセスに戻ってきた際に、intenaフィールドの値が変わっているかもしれないため。
そしてswtch関数でスケジューラスレッドにコンテキストスイッチする。 swtch関数については5.22. scheduler関数とmpmain関数に書く。 関数呼び出しのような要領でコンテキストスイッチを行う。
再び今のプロセスにコンテキストスイッチして戻ってきた際に、退避しておいたintenaフィールドの値を復元する。

proc.c

void
sched(void)
{
  int intena;
  struct proc *p = myproc();

  if(!holding(&ptable.lock))
    panic("sched ptable.lock");
  if(mycpu()->ncli != 1)
    panic("sched locks");
  if(p->state == RUNNING)
    panic("sched running");
  if(readeflags()&FL_IF)
    panic("sched interruptible");
  intena = mycpu()->intena;
  swtch(&p->context, mycpu()->scheduler);
  mycpu()->intena = intena;
}

releasesleep関数

この関数はスリープロックを解放し、同ロックを必要としているプロセスを起床させる。

同じsleeplock構造体のロックの取得が同時に行われないよう、lkフィールド(spinlock構造体)のロックを取る。
sleeplock構造体のアドレスをチャネルとして、wakeup関数でロックを必要としているプロセスを起床させる。

sleeplock.c

void
releasesleep(struct sleeplock *lk)
{
  acquire(&lk->lk);
  lk->locked = 0;
  lk->pid = 0;
  wakeup(lk);
  release(&lk->lk);
}

wakeup関数

wakeup関数は引数で渡されたチャネル(アドレス)を元に、スリープ状態のプロセスを実行可能状態にする。 実際のところはwakeup1関数を呼び出す。

プロセステーブルを走査し、同じチャネルを持っているスリープ状態のプロセスを探すため、プロセステーブルのロックを取る。
wakeup1関数にチャネルを渡し、起床処理を行う。

wakeup関数とwakeup1関数に分かれている理由は、呼び出し元が既にプロセステーブルのロックを取得している場合に対応するため。 プロセステーブルのロックを取っていない場合はwakeup関数を呼び出し、既にロックを取っている場合はwakeup1関数を呼び出す。
プロセステーブルを走査し、スリープ状態かつ同じチャネルを持っているプロセスがいる場合、そのプロセスの状態を実行可能状態にする。

proc.c

static void
wakeup1(void *chan)
{
  struct proc *p;

  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}

// Wake up all processes sleeping on chan.
void
wakeup(void *chan)
{
  acquire(&ptable.lock);
  wakeup1(chan);
  release(&ptable.lock);
}