PsyBlockingQueue.java

package coneforest.psylla.core;

import coneforest.psylla.runtime.*;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;

/**
*	The representation of {@code blockingqueue}.
*/
@Type("blockingqueue")
public final class PsyBlockingQueue
	implements
		PsyFormalQueue<PsyObject>,
		PsyCloseable
{
	/**
	*	Context action of the {@code blockingqueue} operator.
	*/
	@OperatorType("blockingqueue")
	public static final ContextAction PSY_BLOCKINGQUEUE
		=ContextAction.<PsyInteger>ofFunction(PsyBlockingQueue::new);

	private final ArrayBlockingQueue<PsyObject> queue;

	private boolean closed=false;

	public PsyBlockingQueue(final PsyInteger oCapacity)
		throws PsyRangeCheckException, PsyLimitCheckException
	{
		final var capacity=oCapacity.longValue();
		if(capacity>=Integer.MAX_VALUE)
			throw new PsyLimitCheckException();
		try
		{
			queue=new ArrayBlockingQueue<>((int)capacity);
		}
		catch(final IllegalArgumentException ex)
		{
			throw new PsyRangeCheckException();
		}
	}

	/**
	*	{@return the number of elements in this queue}
	*/
	@Override
	public int length()
	{
		return queue.size();
	}

	@Override
	public void psyGive(final PsyObject o)
		throws PsyInterruptException
	{
		try
		{
			queue.put(o);
		}
		catch(final InterruptedException ex)
		{
			throw new PsyInterruptException();
		}
	}

	@Override
	public void psyEnqueue(final PsyObject o)
		throws PsyInvalidStateException
	{
		try
		{
			queue.add(o);
		}
		catch(final IllegalStateException ex)
		{
			throw new PsyInvalidStateException();
		}
	}

	@Override
	public PsyObject psyDequeue()
		throws PsyInvalidStateException
	{
		try
		{
			return queue.remove();
		}
		catch(final IllegalStateException ex)
		{
			throw new PsyInvalidStateException();
		}
	}

	@Override
	public PsyObject psyTake()
		throws PsyInterruptException
	{
		try
		{
			return queue.take();
		}
		catch(final InterruptedException ex)
		{
			throw new PsyInterruptException();
		}
	}

	@Override
	public void psyClose()
	{
		closed=true;
	}

	@Override
	public void psyClear()
	{
		queue.clear();
	}

	@Override
	public Iterator<PsyObject> iterator()
	{
		return queue.iterator();
	}

	@Override
	public int capacity()
	{
		return queue.remainingCapacity()+length();
	}

	@Override
	public PsyStream psyStream()
	{
		return new PsyStream(queue.stream());
	}
}