package edu.uprm.admg.nettraveler.sched;

import edu.uprm.admg.nettraveler.catalog.CatalogException;
import edu.uprm.admg.nettraveler.catalog.CatalogManager;
import edu.uprm.admg.nettraveler.catalog.RelationalCatalogManager;
import edu.uprm.admg.nettraveler.metadata.TableMD;
import edu.uprm.admg.nettraveler.plan.AccessPlan;
import edu.uprm.admg.nettraveler.plan.ExchangePlan;
import edu.uprm.admg.nettraveler.plan.ExecutionSite;
import edu.uprm.admg.nettraveler.plan.FetchPlan;
import edu.uprm.admg.nettraveler.plan.HashAccessPlan;
import edu.uprm.admg.nettraveler.plan.IllegalPlanException;
import edu.uprm.admg.nettraveler.plan.JoinPlan;
import edu.uprm.admg.nettraveler.plan.Plan;
import edu.uprm.admg.nettraveler.plan.ProjectPlan;
import edu.uprm.admg.nettraveler.plan.SelectPlan;
import edu.uprm.admg.nettraveler.plan.UnionPlan;
import edu.uprm.admg.nettraveler.predicate.BucketExpression;
import edu.uprm.admg.nettraveler.schema.Attribute;
import edu.uprm.admg.nettraveler.schema.ValueAttribute;
import edu.uprm.admg.nettraveler.type.MIInteger;
import edu.uprm.admg.util.DataVerify;
import edu.uprm.admg.util.hash.SplitTable;

abstract class AbstractScheduler implements Scheduler{
	protected Plan schedPlan = null;
	protected ExecutionSite[] sites = null;
	protected SplitTable stable = null;
	protected int numPartitions;
	protected SchedType type;
	protected SchedMode mode;
	protected boolean status = false;
	
	public AbstractScheduler(int numPartitions){
		DataVerify.VerifyIntegerParam("number of partitions", numPartitions, 0);
		this.numPartitions = numPartitions;
		this.stable = new SplitTable(this.numPartitions);
	}
	
	public boolean init(ExecutionSite[] sites, Plan plan)  throws SchedulerException {
		this.setExecutionSites(sites);
		this.setPlanSched(plan);
		//Add verify operatios for plan and sites
		//and if problems arises throw an Exception
		this.status = true;
		this.localInit();
		return true;
	}

	public boolean existPlansForSite(ExecutionSite site) {
		return this.localExist(site);
	}

	public Plan getPlanForSite(ExecutionSite site) {
		return this.localGetPlan(site);
	}
	
	public void schedule() throws SchedulerException{
		switch(this.mode){
			case RunTime:
				this.localRTSched();
				break;
			case OpTime:
				this.localOTSched();
				break;
			default:
				throw new SchedulerException("Mode not supported");
		}
	}

	public void setExecutionSites(ExecutionSite[] sites) {
		DataVerify.VerifyArrayParam("sites", sites);
		this.sites = new ExecutionSite[sites.length];
		System.arraycopy(sites, 0, this.sites, 0, this.sites.length);
	}

	public void setPlanSched(Plan plan) {
		DataVerify.VerifyObjectParam("plan", plan);
		this.schedPlan = plan;
	}
	
	public ExecutionSite[] getSites(){
		return this.sites;
	}
	
	public SchedType getType(){
		return this.type;
	}
	
	public SchedMode getMode(){
		return this.mode;
	}
	
	public void setMode(SchedMode mode){
		DataVerify.VerifyObjectParam("Mode",mode);
		this.mode = mode;
	}
	
	public void setType(SchedType type){
		DataVerify.VerifyObjectParam("Type",type);
		this.type = type;
	}
	
	//See if there is a need to eliminate this function on a near future;
	protected String getRelationName() throws SchedulerException{
		Plan access = this.getAccessPlan(this.schedPlan);
		return ((HashAccessPlan)access).getRelationName();
	}
	
	protected Plan getAccessPlan(Plan schedPlan) throws SchedulerException{
		if(schedPlan instanceof HashAccessPlan){
			return schedPlan;
		} else {
			Plan[] childPlans = schedPlan.getChildren();
			if(childPlans.length>1)
				throw new SchedulerException("Plan with more than one child is still unsupported");
			else if (childPlans.length==0){
				throw new SchedulerException("Plan still unsupported");
			}
			return this.getAccessPlan(childPlans[0]);
		}
	}
	
	/*
	 * Need to modify this function on a near future
	 * First: FetchPlan nees an execution site hardcoded
	 * Second: Plans with more than one child
	 * Third: Study Additional ExchangePlans interactions
	 * Fourth: Study a way to include JoinPlan
	 * Fifth: Find a way to include union plans
	 * 
	 */
	protected Plan updatePlan(ExecutionSite site,Plan assignedPlan, Plan modifiedPlan) throws SchedulerException{
		return this.updateHelper(site,assignedPlan, null, modifiedPlan);
	}
	
	private Plan updateHelper(ExecutionSite site, Plan plan, Plan parent, Plan modifiedPlan) throws SchedulerException{
		Plan result = null;
		Plan[] childPlans = plan.getChildren();
		if(childPlans.length>1)
			throw new SchedulerException("Plan with more than one child is still unsupported");
		Attribute[] baseAttribs = plan.getBaseAttr();
		int numResults = plan.getNumResults(); 
		if(plan instanceof AccessPlan) {
			throw new SchedulerException("AccessPlan is still unsupported for scheduling");
		} else if(plan instanceof FetchPlan){
			result = new FetchPlan(plan.getBaseAttr());
			ExecutionSite[] sites = plan.getExecutionSites();
			for(int i=0;i<sites.length;i++){
				result.addExecutionSite(sites[i]);
			}
		} else if (plan instanceof ExchangePlan){
			throw new SchedulerException("ExchangePlan is still unsupported for scheduling");
		} else if (plan instanceof HashAccessPlan){
			result = new HashAccessPlan(((HashAccessPlan)modifiedPlan).getRelationName(),baseAttribs);
			boolean prehashed = ((HashAccessPlan)modifiedPlan).isPreHashed();
			((HashAccessPlan)result).setPreHased(prehashed);
			if(!prehashed){
				((HashAccessPlan)result).setOnTheFlyHashPlan(((HashAccessPlan)modifiedPlan).getExpr(), ((HashAccessPlan)modifiedPlan).getValueAttribute());
			} else {
				((HashAccessPlan)result).setPreHashedPlan(((HashAccessPlan)modifiedPlan).getBuckets());
			}
			result.addExecutionSite(site);
			result.setParentPlan(parent);
		} else if (plan instanceof JoinPlan){
			throw new SchedulerException("JoinPlan is still unsupported for scheduling");
		} else if (plan instanceof ProjectPlan){
			result = new ProjectPlan(baseAttribs);
			try {
				((ProjectPlan)result).setProjAttrs(((ProjectPlan)plan).getProjAttrs());
			} catch (IllegalPlanException e) {
				throw new SchedulerException(e);
			}
			result.addExecutionSite(site);
		} else if (plan instanceof SelectPlan){
			result = new SelectPlan(baseAttribs);
			((SelectPlan)result).setExpression(((SelectPlan)plan).getExpression());
			((SelectPlan)result).setValueAttrs(((SelectPlan)plan).getValueAttrs());
			result.addExecutionSite(site);
		} else if (plan instanceof UnionPlan){
			throw new SchedulerException("Union is still unsupported for Scheduling");
		} else {
			throw new SchedulerException("Plan: "+plan.getClass().toString()+" is still unsupported for scheduling");
		}
		result.setNumResults(numResults);
		if(parent!=null)
			result.setParentPlan(parent);
		if(childPlans.length>0)
			result.addChild(this.updateHelper(site,childPlans[0],result,modifiedPlan));
		return result;
	}
	
	protected Plan setBuckets(ExecutionSite site, MIInteger[] buckets) throws SchedulerException{
		Plan localPlan = this.schedPlan;
		Plan hash = this.getAccessPlan(localPlan);
		boolean prehashed = ((HashAccessPlan)hash).isPreHashed();
		if(!prehashed){
			String relation = this.getRelationName();
			CatalogManager cm = null;
			TableMD relationMD = null;
			int[] keyIdx = null;
			int[] bucketIdx = null;
			int tableLength = 0;
			try {
				cm = RelationalCatalogManager.getInstance();
				relationMD = cm.getTableMD(relation);
				tableLength = relationMD.getNumColumns();
				//Hardcoding keys at the moment
//				ColumnMD[] keys = relationMD.getPrimaryKey();
//				keyIdx = new int[keys.length];
//				for(int j = 0; j<keys.length;j++){
//					keyIdx[j] = keys[j].getColumnIdx();
//				}
				keyIdx = new int[]{0};//hardcoded stufff
				bucketIdx = new int[buckets.length];
				for(int i = 0; i<bucketIdx.length;i++){
					bucketIdx[i] = tableLength++;
				}
				Attribute[] valAttribs = new ValueAttribute[buckets.length];
				for(int i = 0;i<valAttribs.length;i++){
					valAttribs[i] = new ValueAttribute(buckets[i].getClassName(),buckets[i]);
				}
				BucketExpression expr = new BucketExpression(keyIdx,this.numPartitions,bucketIdx);
				((HashAccessPlan)hash).setOnTheFlyHashPlan(expr, valAttribs);
			} catch (CatalogException e) {
				throw new SchedulerException(e);
			}
		} else {
			((HashAccessPlan)hash).setPreHashedPlan(buckets);
		}
		hash.setNumResults(100);//hardcoded
		return this.updatePlan(site,localPlan,hash);
	}
	
	protected abstract boolean localExist(ExecutionSite site);
	
	protected abstract Plan localGetPlan(ExecutionSite site);
	
	protected abstract void localRTSched() throws SchedulerException;
	
	protected abstract void localOTSched() throws SchedulerException;
	
	protected abstract void localInit() throws SchedulerException;
	
	protected abstract Plan assignBuckets(ExecutionSite site, int numBuckets) throws SchedulerException;
	
	/*
	 * Por ahora esta funcion es para corregir el error de
	 * serializacion que esta produciendo
	 * ver maneras para modificar el problema impuest
	 */
	public void destroy(){
		this.mode = null;
		this.schedPlan = null;
		this.sites = null;
		this.stable = null;
		this.type = null;
	}
	
}
