2011-03-05 6 views
11

que tienen una necesidad de un objeto semáforo de un solo permiso en mi programa Java, donde hay un método de adquisición adicional que se parece a esto:¿Cómo puedo escribir un semáforo en Java que priorice los solicitantes exitosos anteriores?

boolean tryAcquire(int id) 

y se comporta de la siguiente manera: Si el ID no se ha encontrado antes, luego recuérdalo y haz lo que haga java.util.concurrent.Semaphore. Si el ID tiene encontrado antes de y ese encuentro resultó en el arrendamiento del permiso, entonces le da prioridad a este subproceso sobre todos los otros hilos que pueden estar esperando el permiso. Yo también querrá un método de liberación extra como:

void release(int id) 

el que hace lo hace el java.util.concurrent.Semaphore, además también se "olvida" de la ID.

que no se sabe muy bien cómo abordar esto, pero aquí es el inicio de una posible implementación, pero me temo que va a ninguna parte:

public final class SemaphoreWithMemory { 

    private final Semaphore semaphore = new Semaphore(1, true); 
    private final Set<Integer> favoured = new ConcurrentSkipListSet<Integer>(); 

    public boolean tryAcquire() { 
     return semaphore.tryAcquire(); 
    } 

    public synchronized boolean tryAcquire(int id) { 
     if (!favoured.contains(id)) { 
      boolean gotIt = tryAcquire(); 
      if (gotIt) { 
       favoured.add(id); 
       return true; 
      } 
      else { 
       return false; 
      } 
     } 
     else { 
      // what do I do here??? 
     } 
    } 

    public void release() { 
     semaphore.release(); 
    } 

    public synchronized void release(int id) { 
     favoured.remove(id); 
     semaphore.release(); 
    } 

} 
+0

¿No esta mejor ser resuelto con algo parecido a una cola de prioridad concurrente? –

+1

No está claro cómo 'tryAcquire()' está relacionado con "hilos en espera" ya que no bloquea. – axtavt

+0

@Andrew sí, tal vez, no sé cómo funciona uno de esos. – jjujuma

Respuesta

1

EDIT:
hecho un poco de experimento. Por favor, consulte this answer para obtener resultados.

En principio, Semaphore tiene una cola de hilos internamente, por lo que, como dice Andrew, si convierte esta cola en una cola prioritaria y encuesta de esta fila para otorgar permisos, probablemente se comporte de la manera que desee. Tenga en cuenta que no puede hacer esto con tryAcquire porque los hilos de esa manera no hacen cola. Por lo que veo parece que tendrías que hackear la clase AbstractQueuedSynchronizer para hacer esto.

Yo también podría pensar en un enfoque probabilístico, como esto: (!. No estoy diciendo que el código de abajo sería una buena idea sólo de intercambio de ideas aquí)

public class SemaphoreWithMemory { 

    private final Semaphore semaphore = new Semaphore(1); 
    private final Set<Integer> favoured = new ConcurrentSkipListSet<Integer>(); 
    private final ThreadLocal<Random> rng = //some good rng 

    public boolean tryAcquire() { 
     for(int i=0; i<8; i++){ 
      Thread.yield(); 
      // Tend to waste more time than tryAcquire(int id) 
      // would waste. 
      if(rng.get().nextDouble() < 0.3){ 
       return semaphore.tryAcquire(); 
      } 
     } 
     return semaphore.tryAcquire(); 
    } 

    public boolean tryAcquire(int id) { 
     if (!favoured.contains(id)) { 
      boolean gotIt = semaphore.tryAcquire(); 
      if (gotIt) { 
       favoured.add(id); 
       return true; 
      } else { 
       return false; 
      } 
     } else { 
      return tryAquire(); 
    } 
} 

O tener el " favorecidos" hilos cuelgan hacia fuera un poco más como esto:
EDIT: resulta que esto era una idea muy mala (con tanto semáforo justo y no justo) (ver mi experiment para más detalles

public boolean tryAcquire(int id) { 
     if (!favoured.contains(id)) { 
      boolean gotIt = semaphore.tryAcquire(5,TimeUnit.MILLISECONDS); 
      if (gotIt) { 
       favoured.add(id); 
       return true; 
      } else { 
       return false; 
      } 
     } else { 
      return tryAquire(); 
    } 
.

Supongo que de esta manera puede sesgar la forma en que se emiten los permisos, mientras que no será justo. Aunque con este código probablemente perderías mucho tiempo en rendimiento ...

0

Suponiendo que quieres que los hilos esperen, pirateé una solución que no es perfecta, pero que debería hacer.

La idea es tener dos semáforos y un indicador de "el favorito está esperando".

Cada hilo que intente adquirir el SemaphoreWithMemory primero intenta adquirir el "semáforo preferido". Un hilo "favorecido" mantiene el semáforo y un usuario no favorecido lo libera inmediatamente. De este modo, el hilo favorito bloquea todos los otros hilos entrantes una vez que ha adquirido este semáforo.

Luego debe adquirirse el segundo "semáforo normal" para finalizar. Pero el subproceso no favorecido vuelve a verificar que no hay un subproceso favorito esperando usar una variable volátil).Si ninguno está esperando, simplemente continúa; si uno está esperando, libera el semáforo normal y las llamadas recursivas vuelven a adquirir.

No estoy muy seguro de que no haya condiciones de carrera al acecho. Si quiere estar seguro, quizás deba refactorizar su código a mano de "elementos de trabajo" a una cola de prioridad, donde otro hilo toma el elemento de trabajo con la más alta prioridad y ejecuta ese código.

public final class SemaphoreWithMemory { 

private volatile boolean favouredAquired = false; 
private final Semaphore favouredSemaphore = new Semaphore(1, true); 
private final Semaphore normalSemaphore = new Semaphore(1, true); 
private final Set<Integer> favoured = new ConcurrentSkipListSet<Integer>(); 

public void acquire() throws InterruptedException { 
    normalSemaphore.acquire(); 
} 

public void acquire(int id) throws InterruptedException { 
    boolean idIsFavoured = favoured.contains(id); 
    favouredSemaphore.acquire(); 
    if (!idIsFavoured) { 
     favouredSemaphore.release(); 
    } else { 
     favouredAquired = true; 
    } 
    normalSemaphore.acquire(); 

    // check again that there is no favoured thread waiting 
    if (!idIsFavoured) { 
     if (favouredAquired) { 
      normalSemaphore.release(); 
      acquire(); // starving probability! 
     } else { 
      favoured.add(id); 
     } 
    } 

} 

public void release() { 
    normalSemaphore.release(); 
    if (favouredAquired) { 
     favouredAquired = false; 
     favouredSemaphore.release(); 
    } 
} 

public void release(int id) { 
    favoured.remove(id); 
    release(); 
} 

} 
0

leí este artículo de Ceki y se interesó por cómo podría ser la adquisición de semáforos sesgada (que no me sentía el comportamiento "bloqueo parcial" tendría sentido en los semáforos, así ..). En mi hardware con 2 procesadores y Sun JVM 1.6, en realidad resulta en un arrendamiento bastante uniforme.

De todos modos, también traté de "sesgar" el leasing de semáforo con la estrategia que escribí en mi otra respuesta. Resulta un simple extra yield declaración solo resulta en un sesgo significativo. Su problema es más complicado, pero tal vez usted puede hacer pruebas similares con su idea y ver lo que hay :)

NOTA El código siguiente se basa en el código de Ceki here

Código:

import java.util.concurrent.*; 

public class BiasedSemaphore implements Runnable { 
    static ThreadLocal<Boolean> favored = new ThreadLocal<Boolean>(){ 
     private boolean gaveOut = false; 
     public synchronized Boolean initialValue(){ 
      if(!gaveOut){ 
       System.out.println("Favored " + Thread.currentThread().getName()); 
       gaveOut = true; 
       return true; 
      } 
      return false; 
     } 
    }; 

    static int THREAD_COUNT = Runtime.getRuntime().availableProcessors(); 
    static Semaphore SEM = new Semaphore(1); 
    static Runnable[] RUNNABLE_ARRAY = new Runnable[THREAD_COUNT]; 
    static Thread[] THREAD_ARRAY = new Thread[THREAD_COUNT]; 

    private int counter = 0; 

    public static void main(String args[]) throws InterruptedException { 
     printEnvironmentInfo(); 
     execute(); 
     printResults(); 
    } 

    public static void printEnvironmentInfo() { 
     System.out.println("java.runtime.version = " 
       + System.getProperty("java.runtime.version")); 
     System.out.println("java.vendor   = " 
       + System.getProperty("java.vendor")); 
     System.out.println("java.version   = " 
       + System.getProperty("java.version")); 
     System.out.println("os.name    = " 
       + System.getProperty("os.name")); 
     System.out.println("os.version   = " 
       + System.getProperty("os.version")); 
    } 

    public static void execute() throws InterruptedException { 
     for (int i = 0; i < THREAD_COUNT; i++) { 
      RUNNABLE_ARRAY[i] = new BiasedSemaphore(); 
      THREAD_ARRAY[i] = new Thread(RUNNABLE_ARRAY[i]); 
      System.out.println("Runnable at "+i + " operated with "+THREAD_ARRAY[i]); 
     } 

     for (Thread t : THREAD_ARRAY) { 
      t.start(); 
     } 
     // let the threads run for a while 
     Thread.sleep(10000); 

     for (int i = 0; i< THREAD_COUNT; i++) { 
      THREAD_ARRAY[i].interrupt(); 
     } 

     for (Thread t : THREAD_ARRAY) { 
      t.join(); 
     } 
    } 

    public static void printResults() { 
     System.out.println("Ran with " + THREAD_COUNT + " threads"); 
     for (int i = 0; i < RUNNABLE_ARRAY.length; i++) { 
      System.out.println("runnable[" + i + "]: " + RUNNABLE_ARRAY[i]); 
     } 
    } 


    public void run() { 
     while (!Thread.currentThread().isInterrupted()) { 
      if (favored.get()) { 
       stuff(); 
      } else { 
       Thread.yield(); 
//    try { 
//     Thread.sleep(1); 
//    } catch (InterruptedException e) { 
//     Thread.currentThread().interrupt(); 
//    } 
       stuff(); 
      } 
     } 
    } 

    private void stuff() { 
     if (SEM.tryAcquire()) { 
      //favored.set(true); 
      counter++; 
      try { 
       Thread.sleep(10); 
      } catch (InterruptedException ex) { 
       Thread.currentThread().interrupt(); 
      } 
      SEM.release(); 
     } else { 
      //favored.set(false); 
     } 
    } 

    public String toString() { 
     return "counter=" + counter; 
    } 
} 

resultados:

java.runtime.version = 1.6.0_21-b07 
java.vendor   = Sun Microsystems Inc. 
java.version   = 1.6.0_21 
os.name    = Windows Vista 
os.version   = 6.0 
Runnable at 0 operated with Thread[Thread-0,5,main] 
Runnable at 1 operated with Thread[Thread-1,5,main] 
Favored Thread-0 
Ran with 2 threads 
runnable[0]: counter=503 
runnable[1]: counter=425 

intentado con 30 segundos en lugar de 10:

java.runtime.version = 1.6.0_21-b07 
java.vendor   = Sun Microsystems Inc. 
java.version   = 1.6.0_21 
os.name    = Windows Vista 
os.version   = 6.0 
Runnable at 0 operated with Thread[Thread-0,5,main] 
Runnable at 1 operated with Thread[Thread-1,5,main] 
Favored Thread-1 
Ran with 2 threads 
runnable[0]: counter=1274 
runnable[1]: counter=1496 

P.S .: Parece que "salir" fue una muy mala idea. Cuando intenté llamar a SEM.tryAcquire(1,TimeUnit.MILLISECONDS); para hilos favoritos y SEM.tryAcquire() para hilos no favorecidos, ¡hilos no favorecidos obtuvieron el permiso casi 5 veces más que el hilo favorito!

Además, me gustaría agregar que estos resultados solo se miden en una situación particular, por lo que no está claro cómo se comportan estas medidas en otras situaciones.

+0

'Semaphore.tryAquire()' sin tiempo de espera es un método no justo, que omite la cola si hay un permiso disponible. Esto a menudo es bueno si la imparcialidad no es un problema, ya que evita un cambio de contexto. –

0

Me parece que la forma más sencilla de hacerlo es no intentar combinar Semaphores, sino construirlo desde cero sobre los monitores. En general, esto es riesgoso, pero en este caso, como no hay buenos elementos en java.util.concurrent, es la forma más clara de hacerlo.

Esto es lo que ocurrió:

public class SemaphoreWithMemory { 

    private final Set<Integer> favouredIDs = new HashSet<Integer>(); 
    private final Object favouredLock = new Object(); 
    private final Object ordinaryLock = new Object(); 
    private boolean available = true; 
    private int favouredWaiting = 0; 

    /** 
    Acquires the permit. Blocks until the permit is acquired. 
    */ 
    public void acquire(int id) throws InterruptedException { 
     Object lock; 
     boolean favoured = false; 

     synchronized (this) { 
      // fast exit for uncontended lock 
      if (available) { 
       doAcquire(favoured, id); 
       return; 
      } 
      favoured = favouredIDs.contains(id); 
      if (favoured) { 
       lock = favouredLock; 
       ++favouredWaiting; 
      } 
      else { 
       lock = ordinaryLock; 
      } 
     } 

     while (true) { 
      synchronized (this) { 
       if (available) { 
        doAcquire(favoured, id); 
        return; 
       } 
      } 
      synchronized (lock) { 
       lock.wait(); 
      } 
     } 
    } 

    private void doAcquire(boolean favoured, int id) { 
     available = false; 
     if (favoured) --favouredWaiting; 
     else favouredIDs.add(id); 
    } 

    /** 
    Releases the permit. 
    */ 
    public synchronized void release() { 
     available = true; 
     Object lock = (favouredWaiting > 0) ? favouredLock : ordinaryLock; 
     synchronized (lock) { 
      lock.notify(); 
     } 
    } 

} 
1

Para el bloqueo modelo de adquisición, lo que acerca de esto:

public class SemWithPreferred { 
    int max; 
    int avail; 
    int preferredThreads; 

    public SemWithPreferred(int max, int avail) { 
     this.max = max; 
     this.avail = avail; 
    } 

    synchronized public void get(int id) throws InterruptedException { 
     boolean thisThreadIsPreferred = idHasBeenServedSuccessfullyBefore(id); 
     if (thisThreadIsPreferred) { 
      preferredThreads++; 
     } 
     while (! (avail > 0 && (preferredThreads == 0 || thisThreadIsPreferred))) { 
      wait(); 
     } 
     System.out.println(String.format("granted, id = %d, preferredThreads = %d", id, preferredThreads)); 
     avail -= 1; 
     if (thisThreadIsPreferred) { 
      preferredThreads--; 
      notifyAll(); // removal of preferred thread could affect other threads' wait predicate 
     } 
    } 

    synchronized public void put() { 
     if (avail < max) { 
      avail += 1; 
      notifyAll(); 
     } 
    } 

    boolean idHasBeenServedSuccessfullyBefore(int id) { 
     // stubbed out, this just treats any id that is a 
     // multiple of 5 as having been served successfully before 
     return id % 5 == 0; 
    } 
} 
Cuestiones relacionadas