0

YQL batch with multiple threads results in loss of results

Hi All,

My team has been doing a project which requires querying the web for around 50,000 queries at once. So our team decided to use batchmode of YQL along with threads in java. The problem is that we saw that as we increased the number of threads , there was lot of loss of query results. For example, from a batch of 20 queries, only 5 or 6 were answered and others were simply null values. I have provided the program that we used below. Also I would like to know any suggestions on how to send large number of queries to yql without much network overhead. Also, the input for our queries is available in text format, and in our scenario we cannot create open tables!!.

Thanks
Vandana


// Description of YQL program

/*YQLDemo reads the input file, creates YQL queries from it and puts them in variable batches[].
Then the program spawns threads and each thread executes a YQL query using the query.multi feature and all such threads run in parallel to achieve better performance.
Output is stored in json format.
Microsoft bing is used with YQL as only bing has the web.count(No. of hits for a search query) parameter.
No. of threads specified by THREADNUM and batch size for the query.multi is specified by variable batchSize.
Output is shown on console as well as saved in YQLOutput1.txt file
Problem faced: Too many queries not answered when the no. of threads running in parallel are increased. Queries not answered are manifested as nulls in the output.*/


import org.json.JSONObject;
import java.io.*;
import java.net.URL;
import java.util.concurrent.PriorityBlockingQueue;


public class YQLDemo {
public static String startString = "http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20query.multi%20where%20queries%3D%22";
public static String startbing="select%20Web.Total%20from%20microsoft.bing%20where%20query%3D";
public static String endbing="and%20source%3D'web'";
public static String endquote="%22";
public static String endString="&format=json&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys";
public static PriorityBlockingQueue<PairT1> pbq=new PriorityBlockingQueue<PairT1>();
public static String seperator="%3B";
public static int batchSize=5;
public static int THREADNUM=100;
public static InputStream[] rstream=new InputStream[1000];
public static StringBuilder sb=new StringBuilder();

public static void main(String[] args) throws Exception {
BufferedReader br=new BufferedReader(new FileReader(new File("YQLInput.txt")));
String line1=br.readLine();
String query[]=null;
while(line1!=null){
query=line1.split(" ");
pbq.put(new PairT1(query[3],Integer.parseInt(query[2]),Integer.parseInt(query[0]),Integer.pa
rseInt(query[1])));
line1=br.readLine();
}
br.close();
String s=YQLDemo.startString;
PairT1 pt=null;
int k=0;
int j=0;
int counter=0;
String[] batches=new String[1000];

//Construction of YQL queries

while(!pbq.isEmpty()){
k=0;
s=startString;
while(k<YQLDemo.batchSize){
pt=YQLDemo.pbq.remove();
s=s+YQLDemo.startbing+pt.query+YQLDemo.endbing;
k++;
if(k!=YQLDemo.batchSize){
s=s+YQLDemo.seperator;
}
j++;
}
s=s+YQLDemo.endquote+YQLDemo.endString;
batches[counter]=s;
counter++;
if(counter>=1000){
break;
}
}

//Creation of Threads where each thread executes one YQL Query.

YQLThread1[] yql=new YQLThread1[THREADNUM];
InputStream is=null;
long startTime=System.currentTimeMillis();
int count=0;
for(int i=0;i<THREADNUM;i++){
yql[i]=new YQLThread1(batches[count], count);
yql[i].start();
count++;
}
for(int i=0;i<THREADNUM;i++){
yql[i].join();
}
BufferedWriter bw=new BufferedWriter(new PrintWriter("YQLOutput1.txt"));
bw.write(sb.toString());
bw.newLine();
bw.close();
long endTime=System.currentTimeMillis();
System.out.println("\n Total time" + (endTime-startTime)/1000);
}

}


class YQLThread1 extends Thread{
String batch;
int index;
public YQLThread1(String batch, int i){
this.batch=batch;
this.index=i;
}
public void run(){
try{
URL url = new URL(batch);
YQLDemo.rstream[index] = url.openStream();
BufferedReader br=new BufferedReader(new InputStreamReader(YQLDemo.rstream[index]));
String str;
StringBuilder buffer=new StringBuilder();
while((str=br.readLine())!=null){
buffer.append(str);
}
JSONObject json=new JSONObject(buffer.toString());
System.out.println("\n"+json + "\t\t");
YQLDemo.sb.append(json+"\n");
long endTime=System.currentTimeMillis();
YQLDemo.rstream[index].close();
}
catch(Exception exp){
exp.printStackTrace();
}
}

}

1 Reply

Recent Posts

in YQL